This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a8b93805ea FakeSource support generate different CatalogTable for
MultipleTable (#5766)
a8b93805ea is described below
commit a8b93805eacc95e970941ecd875f8a1dd593293d
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Nov 16 15:20:09 2023 +0800
FakeSource support generate different CatalogTable for MultipleTable (#5766)
---
docs/en/connector-v2/source/FakeSource.md | 120 +++--
.../seatunnel/fake/config/FakeConfig.java | 519 ++++++++++-----------
.../seatunnel/fake/config/FakeOption.java | 14 +-
.../fake/config/MultipleTableFakeSourceConfig.java | 73 +++
.../seatunnel/fake/source/FakeDataGenerator.java | 40 +-
.../seatunnel/fake/source/FakeSource.java | 44 +-
.../seatunnel/fake/source/FakeSourceFactory.java | 8 +-
.../seatunnel/fake/source/FakeSourceReader.java | 44 +-
.../seatunnel/fake/source/FakeSourceSplit.java | 5 +-
.../fake/source/FakeSourceSplitEnumerator.java | 32 +-
.../config/MultipleTableFakeSourceConfigTest.java | 44 ++
.../fake/source/FakeDataGeneratorTest.java | 50 +-
.../src/test/resources/multiple_table.conf | 76 +++
.../catalog/MaxComputeDataTypeConvertor.java | 4 +-
.../core/starter/utils/ConfigBuilder.java | 4 +-
.../resources/fake_to_assert_with_tablenames.conf | 104 +++--
.../fake_to_console_with_multitable_mode.conf | 54 ++-
.../e2e/connector/pulsar/PulsarBatchIT.java | 38 +-
.../src/test/resources/fake_source.conf} | 46 +-
19 files changed, 774 insertions(+), 545 deletions(-)
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index af89dceada..dff5e61bfa 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -18,45 +18,45 @@ just for some test cases such as type conversion or
connector new feature testin
## Source Options
-| Name | Type | Required | Default |
Description
|
-|---------------------|----------|----------|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| schema | config | yes | - | Define
Schema information
|
-| rows | config | no | - | The
row list of fake data output per degree of parallelism see title `Options rows
Case`.
|
-| row.num | int | no | 5 | The
total number of data generated per degree of parallelism
|
-| split.num | int | no | 1 | the
number of splits generated by the enumerator for each degree of parallelism
|
-| split.read-interval | long | no | 1 | The
interval(mills) between two split reads in a reader
|
-| map.size | int | no | 5 | The
size of `map` type that connector generated
|
-| array.size | int | no | 5 | The
size of `array` type that connector generated
|
-| bytes.length | int | no | 5 | The
length of `bytes` type that connector generated
|
-| string.length | int | no | 5 | The
length of `string` type that connector generated
|
-| string.fake.mode | string | no | range | The
fake mode of generating string data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`string.template` option
|
-| string.template | list | no | - | The
template list of string type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
-| tinyint.fake.mode | string | no | range | The
fake mode of generating tinyint data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`tinyint.template` option
|
-| tinyint.min | tinyint | no | 0 | The
min value of tinyint data that connector generated
|
-| tinyint.max | tinyint | no | 127 | The
max value of tinyint data that connector generated
|
-| tinyint.template | list | no | - | The
template list of tinyint type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
-| smallint.fake.mode | string | no | range | The
fake mode of generating smallint data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`smallint.template` option
|
-| smallint.min | smallint | no | 0 | The
min value of smallint data that connector generated
|
-| smallint.max | smallint | no | 32767 | The
max value of smallint data that connector generated
|
-| smallint.template | list | no | - | The
template list of smallint type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
-| int.fake.template | string | no | range | The
fake mode of generating int data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`int.template` option
|
-| int.min | int | no | 0 | The
min value of int data that connector generated
|
-| int.max | int | no | 0x7fffffff | The
max value of int data that connector generated
|
-| int.template | list | no | - | The
template list of int type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
-| bigint.fake.mode | string | no | range | The
fake mode of generating bigint data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`bigint.template` option
|
-| bigint.min | bigint | no | 0 | The
min value of bigint data that connector generated
|
-| bigint.max | bigint | no | 0x7fffffffffffffff | The
max value of bigint data that connector generated
|
-| bigint.template | list | no | - | The
template list of bigint type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
-| float.fake.mode | string | no | range | The
fake mode of generating float data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`float.template` option
|
-| float.min | float | no | 0 | The
min value of float data that connector generated
|
-| float.max | float | no | 0x1.fffffeP+127 | The
max value of float data that connector generated
|
-| float.template | list | no | - | The
template list of float type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
-| double.fake.mode | string | no | range | The
fake mode of generating float data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`double.template` option
|
-| double.min | double | no | 0 | The
min value of double data that connector generated
|
-| double.max | double | no | 0x1.fffffffffffffP+1023 | The
max value of double data that connector generated
|
-| double.template | list | no | - | The
template list of double type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
-| table-names | list | no | - | The
table list that connector generated, used to simulate multi-table
scenarios.<br/> This option will override the `table` option in the `schema`
option. For example, if you configure the `table-names` option as follows, the
connector will generate data for the `test.table1` and `test.table2` tables,
the `database.schema.table` will be dropFor details, see title `Options
table-names Case`. |
-| common-options | | no | - | Source
plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
|
+| Name | Type | Required | Default |
Description
|
+|---------------------|----------|----------|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| tables_configs | list | no | - | Define
Multiple FakeSource, each item can contains the whole fake source config
description below
|
+| schema | config | yes | - | Define
Schema information
|
+| rows | config | no | - | The
row list of fake data output per degree of parallelism see title `Options rows
Case`.
|
+| row.num | int | no | 5 | The
total number of data generated per degree of parallelism
|
+| split.num | int | no | 1 | the
number of splits generated by the enumerator for each degree of parallelism
|
+| split.read-interval | long | no | 1 | The
interval(mills) between two split reads in a reader
|
+| map.size | int | no | 5 | The
size of `map` type that connector generated
|
+| array.size | int | no | 5 | The
size of `array` type that connector generated
|
+| bytes.length | int | no | 5 | The
length of `bytes` type that connector generated
|
+| string.length | int | no | 5 | The
length of `string` type that connector generated
|
+| string.fake.mode | string | no | range | The
fake mode of generating string data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`string.template` option |
+| string.template | list | no | - | The
template list of string type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
+| tinyint.fake.mode | string | no | range | The
fake mode of generating tinyint data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`tinyint.template` option |
+| tinyint.min | tinyint | no | 0 | The
min value of tinyint data that connector generated
|
+| tinyint.max | tinyint | no | 127 | The
max value of tinyint data that connector generated
|
+| tinyint.template | list | no | - | The
template list of tinyint type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
+| smallint.fake.mode | string | no | range | The
fake mode of generating smallint data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`smallint.template` option |
+| smallint.min | smallint | no | 0 | The
min value of smallint data that connector generated
|
+| smallint.max | smallint | no | 32767 | The
max value of smallint data that connector generated
|
+| smallint.template | list | no | - | The
template list of smallint type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
+| int.fake.template | string | no | range | The
fake mode of generating int data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`int.template` option |
+| int.min | int | no | 0 | The
min value of int data that connector generated
|
+| int.max | int | no | 0x7fffffff | The
max value of int data that connector generated
|
+| int.template | list | no | - | The
template list of int type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
+| bigint.fake.mode | string | no | range | The
fake mode of generating bigint data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`bigint.template` option |
+| bigint.min | bigint | no | 0 | The
min value of bigint data that connector generated
|
+| bigint.max | bigint | no | 0x7fffffffffffffff | The
max value of bigint data that connector generated
|
+| bigint.template | list | no | - | The
template list of bigint type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
+| float.fake.mode | string | no | range | The
fake mode of generating float data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`float.template` option |
+| float.min | float | no | 0 | The
min value of float data that connector generated
|
+| float.max | float | no | 0x1.fffffeP+127 | The
max value of float data that connector generated
|
+| float.template | list | no | - | The
template list of float type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
+| double.fake.mode | string | no | range | The
fake mode of generating float data, support `range` and `template`, default
`range`,if use configured it to `template`, user should also configured
`double.template` option |
+| double.min | double | no | 0 | The
min value of double data that connector generated
|
+| double.max | double | no | 0x1.fffffffffffffP+1023 | The
max value of double data that connector generated
|
+| double.template | list | no | - | The
template list of double type that connector generated, if user configured it,
connector will randomly select an item from the template list
|
+| common-options | | no | - | Source
plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
|
## Task Example
@@ -311,18 +311,38 @@ FakeSource {
```hocon
FakeSource {
- table-names = ["test.table1", "test.table2"]
- schema {
- fields {
- c_string = string
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
+ tables_configs = [
+ {
+ row.num = 16
+ schema {
+ table = "test.table1"
+ fields {
+ c_string = string
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ }
+ }
+ },
+ {
+ row.num = 17
+ schema {
+ table = "test.table2"
+ fields {
+ c_string = string
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ }
+ }
}
- }
+ ]
}
```
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index cec765b3f2..ab1c829536 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -17,13 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.fake.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
-
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import lombok.AllArgsConstructor;
@@ -33,6 +31,7 @@ import lombok.Getter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_FAKE_MODE;
@@ -149,277 +148,257 @@ public class FakeConfig implements Serializable {
private List<RowData> fakeRows;
- @Builder.Default private List<TableIdentifier> tableIdentifiers = new
ArrayList<>();
+ private CatalogTable catalogTable;
- // todo: use ReadonlyConfig
- public static FakeConfig buildWithConfig(Config config) {
+ public static FakeConfig buildWithConfig(ReadonlyConfig readonlyConfig) {
FakeConfigBuilder builder = FakeConfig.builder();
- if (config.hasPath(ROW_NUM.key())) {
- builder.rowNum(config.getInt(ROW_NUM.key()));
- }
- if (config.hasPath(SPLIT_NUM.key())) {
- builder.splitNum(config.getInt(SPLIT_NUM.key()));
- }
- if (config.hasPath(SPLIT_READ_INTERVAL.key())) {
-
builder.splitReadInterval(config.getInt(SPLIT_READ_INTERVAL.key()));
- }
- if (config.hasPath(MAP_SIZE.key())) {
- builder.mapSize(config.getInt(MAP_SIZE.key()));
- }
- if (config.hasPath(ARRAY_SIZE.key())) {
- builder.arraySize(config.getInt(ARRAY_SIZE.key()));
- }
- if (config.hasPath(BYTES_LENGTH.key())) {
- builder.bytesLength(config.getInt(BYTES_LENGTH.key()));
- }
- if (config.hasPath(STRING_LENGTH.key())) {
- builder.stringLength(config.getInt(STRING_LENGTH.key()));
- }
- if (config.hasPath(ROWS.key())) {
- List<? extends Config> configs = config.getConfigList(ROWS.key());
+ builder.rowNum(readonlyConfig.get(ROW_NUM));
+ builder.splitNum(readonlyConfig.get(SPLIT_NUM));
+ builder.splitReadInterval(readonlyConfig.get(SPLIT_READ_INTERVAL));
+ builder.mapSize(readonlyConfig.get(MAP_SIZE));
+ builder.arraySize(readonlyConfig.get(ARRAY_SIZE));
+ builder.bytesLength(readonlyConfig.get(BYTES_LENGTH));
+ builder.stringLength(readonlyConfig.get(STRING_LENGTH));
+
+ if (readonlyConfig.getOptional(ROWS).isPresent()) {
+ List<Map<String, Object>> configs = readonlyConfig.get(ROWS);
List<RowData> rows = new ArrayList<>(configs.size());
- ConfigRenderOptions options = ConfigRenderOptions.concise();
- for (Config configItem : configs) {
- String fieldsJson =
configItem.getValue(RowData.KEY_FIELDS).render(options);
- RowData rowData = new
RowData(configItem.getString(RowData.KEY_KIND), fieldsJson);
+ for (Map<String, Object> configItem : configs) {
+ String fieldsJson =
JsonUtils.toJsonString(configItem.get(RowData.KEY_FIELDS));
+ RowData rowData =
+ new
RowData(configItem.get(RowData.KEY_KIND).toString(), fieldsJson);
rows.add(rowData);
}
builder.fakeRows(rows);
}
- if (config.hasPath(STRING_TEMPLATE.key())) {
-
builder.stringTemplate(config.getStringList(STRING_TEMPLATE.key()));
- }
- if (config.hasPath(TINYINT_TEMPLATE.key())) {
- builder.tinyintTemplate(config.getIntList(TINYINT_TEMPLATE.key()));
- }
- if (config.hasPath(SMALLINT_TEMPLATE.key())) {
-
builder.smallintTemplate(config.getIntList(SMALLINT_TEMPLATE.key()));
- }
- if (config.hasPath(INT_TEMPLATE.key())) {
- builder.intTemplate(config.getIntList(INT_TEMPLATE.key()));
- }
- if (config.hasPath(BIGINT_TEMPLATE.key())) {
- builder.bigTemplate(config.getLongList(BIGINT_TEMPLATE.key()));
- }
- if (config.hasPath(FLOAT_TEMPLATE.key())) {
- builder.floatTemplate(config.getDoubleList(FLOAT_TEMPLATE.key()));
- }
- if (config.hasPath(DOUBLE_TEMPLATE.key())) {
-
builder.doubleTemplate(config.getDoubleList(DOUBLE_TEMPLATE.key()));
- }
- if (config.hasPath(DATE_YEAR_TEMPLATE.key())) {
-
builder.dateYearTemplate(config.getIntList(DATE_YEAR_TEMPLATE.key()));
- }
- if (config.hasPath(DATE_MONTH_TEMPLATE.key())) {
-
builder.dateMonthTemplate(config.getIntList(DATE_MONTH_TEMPLATE.key()));
- }
- if (config.hasPath(DATE_DAY_TEMPLATE.key())) {
-
builder.dateDayTemplate(config.getIntList(DATE_DAY_TEMPLATE.key()));
- }
- if (config.hasPath(TIME_HOUR_TEMPLATE.key())) {
-
builder.timeHourTemplate(config.getIntList(TIME_HOUR_TEMPLATE.key()));
- }
- if (config.hasPath(TIME_MINUTE_TEMPLATE.key())) {
-
builder.timeMinuteTemplate(config.getIntList(TIME_MINUTE_TEMPLATE.key()));
- }
- if (config.hasPath(TIME_SECOND_TEMPLATE.key())) {
-
builder.timeSecondTemplate(config.getIntList(TIME_SECOND_TEMPLATE.key()));
- }
- if (config.hasPath(TINYINT_MIN.key())) {
- int tinyintMin = config.getInt(TINYINT_MIN.key());
- if (tinyintMin < TINYINT_MIN.defaultValue()
- || tinyintMin > TINYINT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- TINYINT_MIN.key()
- + " should >= "
- + TINYINT_MIN.defaultValue()
- + " and <= "
- + TINYINT_MAX.defaultValue());
- }
- builder.tinyintMin(tinyintMin);
- }
- if (config.hasPath(TINYINT_MAX.key())) {
- int tinyintMax = config.getInt(TINYINT_MAX.key());
- if (tinyintMax < TINYINT_MIN.defaultValue()
- || tinyintMax > TINYINT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- TINYINT_MAX.key()
- + " should >= "
- + TINYINT_MIN.defaultValue()
- + " and <= "
- + TINYINT_MAX.defaultValue());
- }
- builder.tinyintMax(tinyintMax);
- }
- if (config.hasPath(SMALLINT_MIN.key())) {
- int smallintMin = config.getInt(SMALLINT_MIN.key());
- if (smallintMin < SMALLINT_MIN.defaultValue()
- || smallintMin > SMALLINT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- SMALLINT_MIN.key()
- + " should >= "
- + SMALLINT_MIN.defaultValue()
- + " and <= "
- + SMALLINT_MAX.defaultValue());
- }
- builder.smallintMin(smallintMin);
- }
- if (config.hasPath(SMALLINT_MAX.key())) {
- int smallintMax = config.getInt(SMALLINT_MAX.key());
- if (smallintMax < SMALLINT_MIN.defaultValue()
- || smallintMax > SMALLINT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- SMALLINT_MAX.key()
- + " should >= "
- + SMALLINT_MIN.defaultValue()
- + " and <= "
- + SMALLINT_MAX.defaultValue());
- }
- builder.smallintMax(smallintMax);
- }
- if (config.hasPath(INT_MIN.key())) {
- int intMin = config.getInt(INT_MIN.key());
- if (intMin < INT_MIN.defaultValue() || intMin >
INT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- INT_MIN.key()
- + " should >= "
- + INT_MIN.defaultValue()
- + " and <= "
- + INT_MAX.defaultValue());
- }
- builder.intMin(intMin);
- }
- if (config.hasPath(INT_MAX.key())) {
- int intMax = config.getInt(INT_MAX.key());
- if (intMax < INT_MIN.defaultValue() || intMax >
INT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- INT_MAX.key()
- + " should >= "
- + INT_MIN.defaultValue()
- + " and <= "
- + INT_MAX.defaultValue());
- }
- builder.intMax(intMax);
- }
- if (config.hasPath(BIGINT_MIN.key())) {
- long bigintMin = config.getLong(BIGINT_MIN.key());
- if (bigintMin < BIGINT_MIN.defaultValue() || bigintMin >
BIGINT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- BIGINT_MIN.key()
- + " should >= "
- + BIGINT_MIN.defaultValue()
- + " and <= "
- + BIGINT_MAX.defaultValue());
- }
- builder.bigintMin(bigintMin);
- }
- if (config.hasPath(BIGINT_MAX.key())) {
- long bigintMax = config.getLong(BIGINT_MAX.key());
- if (bigintMax < BIGINT_MIN.defaultValue() || bigintMax >
BIGINT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- BIGINT_MAX.key()
- + " should >= "
- + BIGINT_MIN.defaultValue()
- + " and <= "
- + BIGINT_MAX.defaultValue());
- }
- builder.bigintMax(bigintMax);
- }
- if (config.hasPath(FLOAT_MIN.key())) {
- double floatMin = config.getDouble(FLOAT_MIN.key());
- if (floatMin < FLOAT_MIN.defaultValue() || floatMin >
FLOAT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- FLOAT_MIN.key()
- + " should >= "
- + FLOAT_MIN.defaultValue()
- + " and <= "
- + FLOAT_MAX.defaultValue());
- }
- builder.floatMin(floatMin);
- }
- if (config.hasPath(FLOAT_MAX.key())) {
- double floatMax = config.getDouble(FLOAT_MAX.key());
- if (floatMax < FLOAT_MIN.defaultValue() || floatMax >
FLOAT_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- FLOAT_MAX.key()
- + " should >= "
- + FLOAT_MIN.defaultValue()
- + " and <= "
- + FLOAT_MAX.defaultValue());
- }
- builder.floatMax(floatMax);
- }
- if (config.hasPath(DOUBLE_MIN.key())) {
- double doubleMin = config.getDouble(DOUBLE_MIN.key());
- if (doubleMin < DOUBLE_MIN.defaultValue() || doubleMin >
DOUBLE_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- DOUBLE_MIN.key()
- + " should >= "
- + DOUBLE_MIN.defaultValue()
- + " and <= "
- + DOUBLE_MAX.defaultValue());
- }
- builder.doubleMin(doubleMin);
- }
- if (config.hasPath(DOUBLE_MAX.key())) {
- double doubleMax = config.getDouble(DOUBLE_MAX.key());
- if (doubleMax < DOUBLE_MIN.defaultValue() || doubleMax >
DOUBLE_MAX.defaultValue()) {
- throw new FakeConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- DOUBLE_MAX.key()
- + " should >= "
- + DOUBLE_MIN.defaultValue()
- + " and <= "
- + DOUBLE_MAX.defaultValue());
- }
- builder.doubleMax(doubleMax);
- }
- if (config.hasPath(STRING_FAKE_MODE.key())) {
- builder.stringFakeMode(
-
FakeOption.FakeMode.parse(config.getString(STRING_FAKE_MODE.key())));
- }
- if (config.hasPath(TINYINT_FAKE_MODE.key())) {
- builder.tinyintFakeMode(
-
FakeOption.FakeMode.parse(config.getString(TINYINT_FAKE_MODE.key())));
- }
- if (config.hasPath(SMALLINT_FAKE_MODE.key())) {
- builder.smallintFakeMode(
-
FakeOption.FakeMode.parse(config.getString(SMALLINT_FAKE_MODE.key())));
- }
- if (config.hasPath(INT_FAKE_MODE.key())) {
-
builder.intFakeMode(FakeOption.FakeMode.parse(config.getString(INT_FAKE_MODE.key())));
- }
- if (config.hasPath(BIGINT_FAKE_MODE.key())) {
- builder.bigintFakeMode(
-
FakeOption.FakeMode.parse(config.getString(BIGINT_FAKE_MODE.key())));
- }
- if (config.hasPath(FLOAT_FAKE_MODE.key())) {
- builder.floatFakeMode(
-
FakeOption.FakeMode.parse(config.getString(FLOAT_FAKE_MODE.key())));
- }
- if (config.hasPath(DOUBLE_FAKE_MODE.key())) {
- builder.doubleFakeMode(
-
FakeOption.FakeMode.parse(config.getString(DOUBLE_FAKE_MODE.key())));
- }
- if (config.hasPath(CatalogOptions.TABLE_NAMES.key())) {
- List<String> tableNames =
config.getStringList(CatalogOptions.TABLE_NAMES.key());
- List<TableIdentifier> tableIdentifiers = new
ArrayList<>(tableNames.size());
- for (String tableName : tableNames) {
- tableIdentifiers.add(TableIdentifier.of("FakeSource",
TablePath.of(tableName)));
- }
- builder.tableIdentifiers(tableIdentifiers);
- }
+
readonlyConfig.getOptional(STRING_TEMPLATE).ifPresent(builder::stringTemplate);
+
readonlyConfig.getOptional(TINYINT_TEMPLATE).ifPresent(builder::tinyintTemplate);
+
readonlyConfig.getOptional(SMALLINT_TEMPLATE).ifPresent(builder::smallintTemplate);
+
readonlyConfig.getOptional(INT_TEMPLATE).ifPresent(builder::intTemplate);
+
readonlyConfig.getOptional(BIGINT_TEMPLATE).ifPresent(builder::bigTemplate);
+
readonlyConfig.getOptional(FLOAT_TEMPLATE).ifPresent(builder::floatTemplate);
+
readonlyConfig.getOptional(DOUBLE_TEMPLATE).ifPresent(builder::doubleTemplate);
+
readonlyConfig.getOptional(DATE_YEAR_TEMPLATE).ifPresent(builder::dateYearTemplate);
+
readonlyConfig.getOptional(DATE_MONTH_TEMPLATE).ifPresent(builder::dateMonthTemplate);
+
readonlyConfig.getOptional(DATE_DAY_TEMPLATE).ifPresent(builder::dateDayTemplate);
+
readonlyConfig.getOptional(TIME_HOUR_TEMPLATE).ifPresent(builder::timeHourTemplate);
+
readonlyConfig.getOptional(TIME_MINUTE_TEMPLATE).ifPresent(builder::timeMinuteTemplate);
+
readonlyConfig.getOptional(TIME_SECOND_TEMPLATE).ifPresent(builder::timeSecondTemplate);
+
+ readonlyConfig
+ .getOptional(TINYINT_MIN)
+ .ifPresent(
+ tinyintMin -> {
+ if (tinyintMin < TINYINT_MIN.defaultValue()
+ || tinyintMin >
TINYINT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ TINYINT_MIN.key()
+ + " should >= "
+ + TINYINT_MIN.defaultValue()
+ + " and <= "
+ + TINYINT_MAX.defaultValue());
+ }
+ builder.tinyintMin(tinyintMin);
+ });
+
+ readonlyConfig
+ .getOptional(TINYINT_MAX)
+ .ifPresent(
+ tinyintMax -> {
+ if (tinyintMax < TINYINT_MIN.defaultValue()
+ || tinyintMax >
TINYINT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ TINYINT_MAX.key()
+ + " should >= "
+ + TINYINT_MIN.defaultValue()
+ + " and <= "
+ + TINYINT_MAX.defaultValue());
+ }
+ builder.tinyintMax(tinyintMax);
+ });
+
+ readonlyConfig
+ .getOptional(SMALLINT_MIN)
+ .ifPresent(
+ smallintMin -> {
+ if (smallintMin < SMALLINT_MIN.defaultValue()
+ || smallintMin >
SMALLINT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ SMALLINT_MIN.key()
+ + " should >= "
+ + SMALLINT_MIN.defaultValue()
+ + " and <= "
+ + SMALLINT_MAX.defaultValue());
+ }
+ builder.smallintMin(smallintMin);
+ });
+
+ readonlyConfig
+ .getOptional(SMALLINT_MAX)
+ .ifPresent(
+ smallintMax -> {
+ if (smallintMax < SMALLINT_MIN.defaultValue()
+ || smallintMax >
SMALLINT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ SMALLINT_MAX.key()
+ + " should >= "
+ + SMALLINT_MIN.defaultValue()
+ + " and <= "
+ + SMALLINT_MAX.defaultValue());
+ }
+ builder.smallintMax(smallintMax);
+ });
+
+ readonlyConfig
+ .getOptional(INT_MIN)
+ .ifPresent(
+ intMin -> {
+ if (intMin < INT_MIN.defaultValue()
+ || intMin > INT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ INT_MIN.key()
+ + " should >= "
+ + INT_MIN.defaultValue()
+ + " and <= "
+ + INT_MAX.defaultValue());
+ }
+ builder.intMin(intMin);
+ });
+
+ readonlyConfig
+ .getOptional(INT_MAX)
+ .ifPresent(
+ intMax -> {
+ if (intMax < INT_MIN.defaultValue()
+ || intMax > INT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ INT_MAX.key()
+ + " should >= "
+ + INT_MIN.defaultValue()
+ + " and <= "
+ + INT_MAX.defaultValue());
+ }
+ builder.intMax(intMax);
+ });
+
+ readonlyConfig
+ .getOptional(BIGINT_MIN)
+ .ifPresent(
+ bigintMin -> {
+ if (bigintMin < BIGINT_MIN.defaultValue()
+ || bigintMin > BIGINT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ BIGINT_MIN.key()
+ + " should >= "
+ + BIGINT_MIN.defaultValue()
+ + " and <= "
+ + BIGINT_MAX.defaultValue());
+ }
+ builder.bigintMin(bigintMin);
+ });
+
+ readonlyConfig
+ .getOptional(BIGINT_MAX)
+ .ifPresent(
+ bigintMax -> {
+ if (bigintMax < BIGINT_MIN.defaultValue()
+ || bigintMax > BIGINT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ BIGINT_MAX.key()
+ + " should >= "
+ + BIGINT_MIN.defaultValue()
+ + " and <= "
+ + BIGINT_MAX.defaultValue());
+ }
+ builder.bigintMax(bigintMax);
+ });
+
+ readonlyConfig
+ .getOptional(FLOAT_MIN)
+ .ifPresent(
+ floatMin -> {
+ if (floatMin < FLOAT_MIN.defaultValue()
+ || floatMin > FLOAT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ FLOAT_MIN.key()
+ + " should >= "
+ + FLOAT_MIN.defaultValue()
+ + " and <= "
+ + FLOAT_MAX.defaultValue());
+ }
+ builder.floatMin(floatMin);
+ });
+
+ readonlyConfig
+ .getOptional(FLOAT_MAX)
+ .ifPresent(
+ floatMax -> {
+ if (floatMax < FLOAT_MIN.defaultValue()
+ || floatMax > FLOAT_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ FLOAT_MAX.key()
+ + " should >= "
+ + FLOAT_MIN.defaultValue()
+ + " and <= "
+ + FLOAT_MAX.defaultValue());
+ }
+ builder.floatMax(floatMax);
+ });
+
+ readonlyConfig
+ .getOptional(DOUBLE_MIN)
+ .ifPresent(
+ doubleMin -> {
+ if (doubleMin < DOUBLE_MIN.defaultValue()
+ || doubleMin > DOUBLE_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ DOUBLE_MIN.key()
+ + " should >= "
+ + DOUBLE_MIN.defaultValue()
+ + " and <= "
+ + DOUBLE_MAX.defaultValue());
+ }
+ builder.doubleMin(doubleMin);
+ });
+
+ readonlyConfig
+ .getOptional(DOUBLE_MAX)
+ .ifPresent(
+ doubleMax -> {
+ if (doubleMax < DOUBLE_MIN.defaultValue()
+ || doubleMax > DOUBLE_MAX.defaultValue()) {
+ throw new FakeConnectorException(
+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ DOUBLE_MAX.key()
+ + " should >= "
+ + DOUBLE_MIN.defaultValue()
+ + " and <= "
+ + DOUBLE_MAX.defaultValue());
+ }
+ builder.doubleMax(doubleMax);
+ });
+
+
readonlyConfig.getOptional(STRING_FAKE_MODE).ifPresent(builder::stringFakeMode);
+
readonlyConfig.getOptional(TINYINT_FAKE_MODE).ifPresent(builder::tinyintFakeMode);
+
readonlyConfig.getOptional(SMALLINT_FAKE_MODE).ifPresent(builder::smallintFakeMode);
+
readonlyConfig.getOptional(INT_FAKE_MODE).ifPresent(builder::intFakeMode);
+
readonlyConfig.getOptional(BIGINT_FAKE_MODE).ifPresent(builder::bigintFakeMode);
+
readonlyConfig.getOptional(FLOAT_FAKE_MODE).ifPresent(builder::floatFakeMode);
+
readonlyConfig.getOptional(DOUBLE_FAKE_MODE).ifPresent(builder::doubleFakeMode);
+
+ builder.catalogTable(CatalogTableUtil.buildWithConfig("FakeSource",
readonlyConfig));
+
return builder.build();
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
index d16d5c4f59..f9ff49cc41 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
@@ -17,17 +17,25 @@
package org.apache.seatunnel.connectors.seatunnel.fake.config;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import java.util.List;
+import java.util.Map;
public class FakeOption {
- public static final Option<List<SeaTunnelRow>> ROWS =
+ public static final Option<List<Map<String, Object>>> TABLES_CONFIGS =
+ Options.key("tables_configs")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription("The multiple table config list of fake
source");
+
+ public static final Option<List<Map<String, Object>>> ROWS =
Options.key("rows")
- .listType(SeaTunnelRow.class)
+ .type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("The row list of fake data output per
degree of parallelism");
public static final Option<Integer> ROW_NUM =
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
new file mode 100644
index 0000000000..051d88a88f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.common.collect.Lists;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MultipleTableFakeSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private List<FakeConfig> fakeConfigs;
+
+ public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) {
+ if
(fakeSourceRootConfig.getOptional(FakeOption.TABLES_CONFIGS).isPresent()) {
+ parseFromConfigs(fakeSourceRootConfig);
+ } else {
+ parseFromConfig(fakeSourceRootConfig);
+ }
+ // validate
+ if (fakeConfigs.size() > 1) {
+ List<String> tableNames =
+ fakeConfigs.stream()
+ .map(FakeConfig::getCatalogTable)
+ .map(catalogTable ->
catalogTable.getTableId().toTablePath().toString())
+ .collect(Collectors.toList());
+ if (CollectionUtils.size(tableNames) != new
HashSet<>(tableNames).size()) {
+ throw new IllegalArgumentException("table name: " + tableNames
+ " must be unique");
+ }
+ }
+ }
+
+ private void parseFromConfigs(ReadonlyConfig readonlyConfig) {
+ List<ReadonlyConfig> readonlyConfigs =
+
readonlyConfig.getOptional(FakeOption.TABLES_CONFIGS).get().stream()
+ .map(ReadonlyConfig::fromMap)
+ .collect(Collectors.toList());
+ // Use the config outside if it's not set in sub config
+ fakeConfigs =
+ readonlyConfigs.stream()
+ .map(FakeConfig::buildWithConfig)
+ .collect(Collectors.toList());
+ }
+
+ private void parseFromConfig(ReadonlyConfig readonlyConfig) {
+ FakeConfig fakeConfig = FakeConfig.buildWithConfig(readonlyConfig);
+ fakeConfigs = Lists.newArrayList(fakeConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 0d8b94f90f..2c9559078f 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -32,8 +32,6 @@ import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorExc
import
org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.commons.lang3.RandomUtils;
-
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
@@ -41,18 +39,21 @@ import java.util.HashMap;
import java.util.List;
public class FakeDataGenerator {
- private final SeaTunnelRowType rowType;
+ private final CatalogTable catalogTable;
private final FakeConfig fakeConfig;
private final JsonDeserializationSchema jsonDeserializationSchema;
private final FakeDataRandomUtils fakeDataRandomUtils;
+ private String tableId;
- public FakeDataGenerator(SeaTunnelRowType rowType, FakeConfig fakeConfig) {
- this.rowType = rowType;
+ public FakeDataGenerator(FakeConfig fakeConfig) {
+ this.catalogTable = fakeConfig.getCatalogTable();
+ this.tableId = catalogTable.getTableId().toTablePath().toString();
this.fakeConfig = fakeConfig;
this.jsonDeserializationSchema =
fakeConfig.getFakeRows() == null
? null
- : new JsonDeserializationSchema(false, false, rowType);
+ : new JsonDeserializationSchema(
+ false, false,
catalogTable.getSeaTunnelRowType());
this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig);
}
@@ -63,6 +64,7 @@ public class FakeDataGenerator {
if (rowData.getKind() != null) {
seaTunnelRow.setRowKind(RowKind.valueOf(rowData.getKind()));
}
+ seaTunnelRow.setTableId(tableId);
return seaTunnelRow;
} catch (IOException e) {
throw new
FakeConnectorException(CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, e);
@@ -70,39 +72,35 @@ public class FakeDataGenerator {
}
private SeaTunnelRow randomRow() {
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
String[] fieldNames = rowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes();
List<Object> randomRow = new ArrayList<>(fieldNames.length);
for (SeaTunnelDataType<?> fieldType : fieldTypes) {
randomRow.add(randomColumnValue(fieldType));
}
- SeaTunnelRow row = new SeaTunnelRow(randomRow.toArray());
- if (!fakeConfig.getTableIdentifiers().isEmpty()) {
- row.setTableId(
- fakeConfig
- .getTableIdentifiers()
- .get(RandomUtils.nextInt(0,
fakeConfig.getTableIdentifiers().size()))
- .toTablePath()
- .toString());
- }
- return row;
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(randomRow.toArray());
+ seaTunnelRow.setTableId(tableId);
+ return seaTunnelRow;
}
/**
* @param rowNum The number of pieces of data to be generated by the
current task
- * @param output Data collection and distribution
+ * @return The generated data
*/
- public void collectFakedRows(int rowNum, Collector<SeaTunnelRow> output) {
+ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
// Use manual configuration data preferentially
+ List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
if (fakeConfig.getFakeRows() != null) {
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
- output.collect(convertRow(rowData));
+ seaTunnelRows.add(convertRow(rowData));
}
} else {
for (int i = 0; i < rowNum; i++) {
- output.collect(randomRow());
+ seaTunnelRows.add(randomRow());
}
}
+ return seaTunnelRows;
}
@SuppressWarnings("magicnumber")
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index d3bf9c6430..dd312bed10 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -26,17 +26,12 @@ import
org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
-import org.apache.commons.collections4.CollectionUtils;
-
-import com.google.common.collect.Lists;
-
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,14 +42,10 @@ public class FakeSource
SupportColumnProjection {
private JobContext jobContext;
- private CatalogTable catalogTable;
- private FakeConfig fakeConfig;
-
- public FakeSource() {}
+ private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig;
public FakeSource(ReadonlyConfig readonlyConfig) {
- this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(),
readonlyConfig);
- this.fakeConfig =
FakeConfig.buildWithConfig(readonlyConfig.toConfig());
+ this.multipleTableFakeSourceConfig = new
MultipleTableFakeSourceConfig(readonlyConfig);
}
@Override
@@ -66,29 +57,16 @@ public class FakeSource
@Override
public List<CatalogTable> getProducedCatalogTables() {
- // If tableNames is empty, means this is only one catalogTable, return
the original
- // catalogTable
- if (CollectionUtils.isEmpty(fakeConfig.getTableIdentifiers())) {
- return Lists.newArrayList(catalogTable);
- }
- // Otherwise, return the catalogTables with the tableNames
- return fakeConfig.getTableIdentifiers().stream()
- .map(
- tableIdentifier ->
- CatalogTable.of(
- TableIdentifier.of(
- getPluginName(),
tableIdentifier.toTablePath()),
- catalogTable.getTableSchema(),
- catalogTable.getOptions(),
- catalogTable.getPartitionKeys(),
- catalogTable.getComment()))
+ return multipleTableFakeSourceConfig.getFakeConfigs().stream()
+ .map(FakeConfig::getCatalogTable)
.collect(Collectors.toList());
}
@Override
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState>
createEnumerator(
- SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext)
throws Exception {
- return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig,
Collections.emptySet());
+ SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
+ return new FakeSourceSplitEnumerator(
+ enumeratorContext, multipleTableFakeSourceConfig,
Collections.emptySet());
}
@Override
@@ -96,13 +74,15 @@ public class FakeSource
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
FakeSourceState checkpointState) {
return new FakeSourceSplitEnumerator(
- enumeratorContext, fakeConfig,
checkpointState.getAssignedSplits());
+ enumeratorContext,
+ multipleTableFakeSourceConfig,
+ checkpointState.getAssignedSplits());
}
@Override
public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(
SourceReader.Context readerContext) {
- return new FakeSourceReader(readerContext,
catalogTable.getSeaTunnelRowType(), fakeConfig);
+ return new FakeSourceReader(readerContext,
multipleTableFakeSourceConfig);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
index 91ef55950c..1578a01523 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -54,6 +53,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.S
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE;
+import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TABLES_CONFIGS;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE;
@@ -70,7 +70,8 @@ public class FakeSourceFactory implements TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(TableSchemaOptions.SCHEMA)
+ .optional(TABLES_CONFIGS)
+ .optional(TableSchemaOptions.SCHEMA)
.optional(STRING_FAKE_MODE)
.conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE,
STRING_TEMPLATE)
.optional(TINYINT_FAKE_MODE)
@@ -98,8 +99,7 @@ public class FakeSourceFactory implements TableSourceFactory {
DATE_DAY_TEMPLATE,
TIME_HOUR_TEMPLATE,
TIME_MINUTE_TEMPLATE,
- TIME_SECOND_TEMPLATE,
- CatalogOptions.TABLE_NAMES)
+ TIME_SECOND_TEMPLATE)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 84b49722e9..016f336d37 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
import lombok.extern.slf4j.Slf4j;
@@ -30,7 +30,9 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
@Slf4j
public class FakeSourceReader implements SourceReader<SeaTunnelRow,
FakeSourceSplit> {
@@ -38,16 +40,34 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
private final SourceReader.Context context;
private final Deque<FakeSourceSplit> splits = new
ConcurrentLinkedDeque<>();
- private final FakeConfig config;
- private final FakeDataGenerator fakeDataGenerator;
+ private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig;
+ // TableFullName to FakeDataGenerator
+ private final Map<String, FakeDataGenerator> fakeDataGeneratorMap;
private volatile boolean noMoreSplit;
+ private final long minSplitReadInterval;
private volatile long latestTimestamp = 0;
public FakeSourceReader(
- SourceReader.Context context, SeaTunnelRowType rowType, FakeConfig
fakeConfig) {
+ SourceReader.Context context,
+ MultipleTableFakeSourceConfig multipleTableFakeSourceConfig) {
this.context = context;
- this.config = fakeConfig;
- this.fakeDataGenerator = new FakeDataGenerator(rowType, fakeConfig);
+ this.multipleTableFakeSourceConfig = multipleTableFakeSourceConfig;
+ this.fakeDataGeneratorMap =
+ multipleTableFakeSourceConfig.getFakeConfigs().stream()
+ .collect(
+ Collectors.toMap(
+ fakeConfig ->
+ fakeConfig
+ .getCatalogTable()
+ .getTableId()
+ .toTablePath()
+ .toString(),
+ FakeDataGenerator::new));
+ this.minSplitReadInterval =
+ multipleTableFakeSourceConfig.getFakeConfigs().stream()
+ .map(FakeConfig::getSplitReadInterval)
+ .min(Integer::compareTo)
+ .get();
}
@Override
@@ -64,19 +84,23 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
@SuppressWarnings("MagicNumber")
public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
long currentTimestamp = Instant.now().toEpochMilli();
- if (currentTimestamp <= latestTimestamp +
config.getSplitReadInterval()) {
+ if (currentTimestamp <= latestTimestamp + minSplitReadInterval) {
return;
}
latestTimestamp = currentTimestamp;
synchronized (output.getCheckpointLock()) {
FakeSourceSplit split = splits.poll();
if (null != split) {
+ FakeDataGenerator fakeDataGenerator =
fakeDataGeneratorMap.get(split.getTableId());
// Randomly generated data are sent directly to the downstream
operator
- fakeDataGenerator.collectFakedRows(split.getRowNum(), output);
+ List<SeaTunnelRow> seaTunnelRows =
+ fakeDataGenerator.generateFakedRows(split.getRowNum());
+ seaTunnelRows.forEach(output::collect);
log.info(
- "{} rows of data have been generated in split({}).
Generation time: {}",
- split.getRowNum(),
+ "{} rows of data have been generated in split({}) for
table {}. Generation time: {}",
+ seaTunnelRows.size(),
split.splitId(),
+ split.getTableId(),
latestTimestamp);
} else {
if (!noMoreSplit) {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
index 1b8e51ba6f..796b6423f5 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
@@ -25,12 +25,15 @@ import lombok.Data;
@Data
@AllArgsConstructor
public class FakeSourceSplit implements SourceSplit {
+
+ private String tableId;
+
private int splitId;
private int rowNum;
@Override
public String splitId() {
- return String.valueOf(splitId);
+ return tableId + "_" + splitId;
}
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
index 9e9356c1ee..102957d26e 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
import lombok.extern.slf4j.Slf4j;
@@ -38,7 +39,7 @@ public class FakeSourceSplitEnumerator
private final SourceSplitEnumerator.Context<FakeSourceSplit>
enumeratorContext;
private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;
- private final FakeConfig fakeConfig;
+ private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig;
/** Partitions that have been assigned to readers. */
private final Set<FakeSourceSplit> assignedSplits;
@@ -46,11 +47,11 @@ public class FakeSourceSplitEnumerator
public FakeSourceSplitEnumerator(
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
- FakeConfig config,
+ MultipleTableFakeSourceConfig multipleTableFakeSourceConfig,
Set<FakeSourceSplit> assignedSplits) {
this.enumeratorContext = enumeratorContext;
this.pendingSplits = new HashMap<>();
- this.fakeConfig = config;
+ this.multipleTableFakeSourceConfig = multipleTableFakeSourceConfig;
this.assignedSplits = new HashSet<>(assignedSplits);
}
@@ -99,20 +100,29 @@ public class FakeSourceSplitEnumerator
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+ public void notifyCheckpointComplete(long checkpointId) {}
private void discoverySplits() {
Set<FakeSourceSplit> allSplit = new HashSet<>();
log.info("Starting to calculate splits.");
int numReaders = enumeratorContext.currentParallelism();
- int readerRowNum = fakeConfig.getRowNum();
- int splitNum = fakeConfig.getSplitNum();
- int splitRowNum = (int) Math.ceil((double) readerRowNum / splitNum);
- for (int i = 0; i < numReaders; i++) {
- int index = i;
- for (int num = 0; num < readerRowNum; index += numReaders, num +=
splitRowNum) {
- allSplit.add(new FakeSourceSplit(index, Math.min(splitRowNum,
readerRowNum - num)));
+ for (FakeConfig fakeConfig :
multipleTableFakeSourceConfig.getFakeConfigs()) {
+ String tableId =
fakeConfig.getCatalogTable().getTableId().toTablePath().toString();
+ int readerRowNum = fakeConfig.getRowNum();
+ int splitNum = fakeConfig.getSplitNum();
+ int splitRowNum = (int) Math.ceil((double) readerRowNum /
splitNum);
+ for (int i = 0; i < numReaders; i++) {
+ int index = i;
+ for (int num = 0; num < readerRowNum; index += numReaders, num
+= splitRowNum) {
+ allSplit.add(
+ new FakeSourceSplit(
+ tableId, index, Math.min(splitRowNum,
readerRowNum - num)));
+ }
}
+ log.info(
+ "Calculated splits for table {} successfully, the size of
splits is {}.",
+ tableId,
+ allSplit.size());
}
assignedSplits.forEach(allSplit::remove);
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfigTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfigTest.java
new file mode 100644
index 0000000000..51745e8cfb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfigTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+class MultipleTableFakeSourceConfigTest {
+
+ @Test
+ void getFakeConfigs() throws URISyntaxException {
+ URL resource =
MultipleTableFakeSourceConfigTest.class.getResource("/multiple_table.conf");
+ Config config = ConfigFactory.parseFile(new
File(Paths.get(resource.toURI()).toString()));
+ ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(config.getConfig("FakeSource"));
+ MultipleTableFakeSourceConfig multipleTableFakeSourceConfig =
+ new MultipleTableFakeSourceConfig(readonlyConfig);
+ Assertions.assertEquals(2,
multipleTableFakeSourceConfig.getFakeConfigs().size());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
index 6e5962e3a7..bf962187f2 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -36,7 +36,6 @@ import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -47,26 +46,15 @@ public class FakeDataGeneratorTest {
@ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
public void testComplexSchemaParse(String conf)
throws FileNotFoundException, URISyntaxException {
- Config testConfig = getTestConfigFile(conf);
+ ReadonlyConfig testConfig = getTestConfigFile(conf);
SeaTunnelRowType seaTunnelRowType =
CatalogTableUtil.buildWithConfig(testConfig).getSeaTunnelRowType();
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(seaTunnelRowType, fakeConfig);
- List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
- fakeDataGenerator.collectFakedRows(
- fakeConfig.getRowNum(),
- new Collector<SeaTunnelRow>() {
- @Override
- public void collect(SeaTunnelRow record) {
- seaTunnelRows.add(record);
- }
-
- @Override
- public Object getCheckpointLock() {
- throw new UnsupportedOperationException();
- }
- });
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ List<SeaTunnelRow> seaTunnelRows =
+ fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
Assertions.assertNotNull(seaTunnelRows);
+
Assertions.assertEquals(seaTunnelRows.size(), 10);
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
for (int i = 0; i < seaTunnelRowType.getFieldTypes().length; i++) {
@@ -109,29 +97,15 @@ public class FakeDataGeneratorTest {
List<SeaTunnelRow> expected =
Arrays.asList(row1, row2, row3, row1UpdateBefore,
row1UpdateAfter, row2Delete);
- Config testConfig = getTestConfigFile(conf);
- SeaTunnelRowType seaTunnelRowType =
-
CatalogTableUtil.buildWithConfig(testConfig).getSeaTunnelRowType();
+ ReadonlyConfig testConfig = getTestConfigFile(conf);
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(seaTunnelRowType, fakeConfig);
- List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
- fakeDataGenerator.collectFakedRows(
- fakeConfig.getRowNum(),
- new Collector<SeaTunnelRow>() {
- @Override
- public void collect(SeaTunnelRow record) {
- seaTunnelRows.add(record);
- }
-
- @Override
- public Object getCheckpointLock() {
- throw new UnsupportedOperationException();
- }
- });
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ List<SeaTunnelRow> seaTunnelRows =
+ fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
Assertions.assertIterableEquals(expected, seaTunnelRows);
}
- private Config getTestConfigFile(String configFile)
+ private ReadonlyConfig getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
if (!configFile.startsWith("/")) {
configFile = "/" + configFile;
@@ -143,6 +117,6 @@ public class FakeDataGeneratorTest {
String path = Paths.get(resource.toURI()).toString();
Config config = ConfigFactory.parseFile(new File(path));
assert config.hasPath("FakeSource");
- return config.getConfig("FakeSource");
+ return ReadonlyConfig.fromConfig(config.getConfig("FakeSource"));
}
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/multiple_table.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/multiple_table.conf
new file mode 100644
index 0000000000..d42413934e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/multiple_table.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FakeSource {
+ tables_configs = [
+ {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ table = "fake.table1"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ },
+ {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ table = "fake.table2"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+ ]
+ result_table_name = "fake"
+}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
index 2983c05c6c..110222ae32 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -209,9 +209,9 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case DATETIME:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- case TIMESTAMP:
return LocalTimeType.LOCAL_TIME_TYPE;
+ case TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case BOOLEAN:
return BasicType.BOOLEAN_TYPE;
case VOID:
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
index 9ef0c2020d..0859690feb 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
@@ -65,7 +65,7 @@ public class ConfigBuilder {
adapterSupplier
.map(adapter -> of(adapter, filePath))
.orElseGet(() -> ofInner(filePath));
- log.info("Parsed config file: {}",
config.root().render(CONFIG_RENDER_OPTIONS));
+ log.info("Parsed config file: \n{}",
config.root().render(CONFIG_RENDER_OPTIONS));
return config;
}
@@ -84,7 +84,7 @@ public class ConfigBuilder {
if (!isEncrypt) {
config = ConfigShadeUtils.decryptConfig(config);
}
- log.info("Parsed config file: {}",
config.root().render(CONFIG_RENDER_OPTIONS));
+ log.info("Parsed config file: \n{}",
config.root().render(CONFIG_RENDER_OPTIONS));
return config;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
index 32fb751610..5b526ba337 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
@@ -22,40 +22,80 @@ env {
source {
FakeSource {
- row.num = 100
- table-names = ["test.table1", "test.table2"]
- schema = {
- columns = [
- {
- name = id
- type = bigint
- }
- {
- name = name
- type = string
- }
- {
- name = age
- type = int
+ tables_configs = [
+ {
+ row.num = 100
+ schema = {
+ table = "test.table1"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ {
+ name = name
+ type = string
+ }
+ {
+ name = age
+ type = int
+ }
+ ]
+ primaryKey = {
+ name = "primary key"
+ columnNames = ["id"]
+ }
+ constraintKeys = [
+ {
+ constraintName = "unique_name"
+ constraintType = UNIQUE_KEY
+ constraintColumns = [
+ {
+ columnName = "id"
+ sortType = ASC
+ }
+ ]
+ }
+ ]
}
- ]
- primaryKey = {
- name = "primary key"
- columnNames = ["id"]
- }
- constraintKeys = [
- {
- constraintName = "unique_name"
- constraintType = UNIQUE_KEY
- constraintColumns = [
- {
- columnName = "id"
- sortType = ASC
- }
- ]
+ },
+ {
+ row.num = 100
+ schema = {
+ table = "test.table2"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ {
+ name = name
+ type = string
+ }
+ {
+ name = age
+ type = int
+ }
+ ]
+ primaryKey = {
+ name = "primary key"
+ columnNames = ["id"]
+ }
+ constraintKeys = [
+ {
+ constraintName = "unique_name"
+ constraintType = UNIQUE_KEY
+ constraintColumns = [
+ {
+ columnName = "id"
+ sortType = ASC
+ }
+ ]
+ }
+ ]
}
- ]
- }
+ }
+ ]
result_table_name = "fake"
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf
index 22d68207ea..c7bf0adb10 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf
@@ -22,24 +22,48 @@ env {
source {
FakeSource {
- row.num = 100
- table-names = ["test.table1", "test.table2"]
- schema = {
- columns = [
+ tables_configs = [
{
- name = id
- type = bigint
- }
- {
- name = name
- type = string
- }
+ row.num = 100
+ schema = {
+ table = "test.table1"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ {
+ name = name
+ type = string
+ }
+ {
+ name = age
+ type = int
+ }
+ ]
+ }
+ },
{
- name = age
- type = int
+ row.num = 100
+ schema = {
+ table = "test.table2"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ {
+ name = name
+ type = string
+ }
+ {
+ name = age
+ type = int
+ }
+ ]
+ }
}
- ]
- }
+ ]
result_table_name = "fake"
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
index 092f37f9bc..466eff946e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -17,9 +17,10 @@
package org.apache.seatunnel.e2e.connector.pulsar;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -52,12 +53,13 @@ import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import java.io.File;
import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Paths;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -144,35 +146,23 @@ public class PulsarBatchIT extends TestSuiteBase
implements TestResource {
private void produceData() {
try {
- FakeConfig fakeConfig =
FakeConfig.buildWithConfig(ConfigFactory.empty());
- FakeDataGenerator fakeDataGenerator =
- new FakeDataGenerator(SEATUNNEL_ROW_TYPE, fakeConfig);
- SimpleCollector simpleCollector = new SimpleCollector();
- fakeDataGenerator.collectFakedRows(100, simpleCollector);
+ URL resource =
PulsarBatchIT.class.getResource("/fake_source.conf");
+ Config config =
+ ConfigFactory.parseFile(new
File(Paths.get(resource.toURI()).toString()));
+
+ FakeConfig fakeConfig =
FakeConfig.buildWithConfig(ReadonlyConfig.fromConfig(config));
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(100);
JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(SEATUNNEL_ROW_TYPE);
- for (SeaTunnelRow seaTunnelRow : simpleCollector.getList()) {
+ for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
producer.send(jsonSerializationSchema.serialize(seaTunnelRow));
}
- } catch (PulsarClientException e) {
+ } catch (Exception e) {
throw new RuntimeException("produce data error", e);
}
}
- private static class SimpleCollector implements Collector<SeaTunnelRow> {
- @Getter private List<SeaTunnelRow> list = new ArrayList<>();
-
- @Override
- public void collect(SeaTunnelRow record) {
- list.add(record);
- }
-
- @Override
- public Object getCheckpointLock() {
- return null;
- }
- }
-
@TestTemplate
void testPulsarBatch(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult execResult =
container.executeJob("/batch_pulsar_to_console.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_source.conf
similarity index 63%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_source.conf
index 22d68207ea..5fa5b073d7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_console_with_multitable_mode.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_source.conf
@@ -15,36 +15,22 @@
# limitations under the License.
#
-env {
- execution.parallelism = 1
- job.mode = "BATCH"
-}
+schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(38, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
-source {
- FakeSource {
- row.num = 100
- table-names = ["test.table1", "test.table2"]
- schema = {
- columns = [
- {
- name = id
- type = bigint
- }
- {
- name = name
- type = string
- }
- {
- name = age
- type = int
- }
- ]
}
- result_table_name = "fake"
- }
-}
-
-sink{
- Console {
- }
}
\ No newline at end of file