This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 652921fb75 Support config tableIdentifier for schema (#5628)
652921fb75 is described below
commit 652921fb7523c4deb86f972352daf86795c6b9a0
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Oct 17 14:23:56 2023 +0800
Support config tableIdentifier for schema (#5628)
---
docs/en/concept/schema-feature.md | 18 +++++++++++
docs/en/connector-v2/source/FakeSource.md | 16 +++++++++-
.../seatunnel/api/table/catalog/CatalogTable.java | 8 +++--
.../api/table/catalog/CatalogTableUtil.java | 37 +++++++++++++++++++---
.../seatunnel/api/table/catalog/TablePath.java | 16 +++++++++-
.../table/catalog/schema/TableSchemaOptions.java | 21 ++++++++++++
.../assertion/rule/AssertCatalogTableRule.java | 27 ++++++++++++++++
.../rule/AssertCatalogTableRuleParser.java | 19 +++++++++++
.../seatunnel/assertion/sink/AssertConfig.java | 7 ++++
.../seatunnel/fake/config/FakeConfig.java | 2 +-
.../seatunnel/fake/source/FakeSource.java | 32 +++++++++++--------
.../fake_to_assert_with_catalogtable.conf | 6 ++++
12 files changed, 187 insertions(+), 22 deletions(-)
diff --git a/docs/en/concept/schema-feature.md
b/docs/en/concept/schema-feature.md
index 01b5072800..4c95e79fef 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -11,6 +11,9 @@ We can use SchemaOptions to define schema, the SchemaOptions
contains some confi
```
schema = {
+ table = "database.schema.table"
+ schema_first = false
+ comment = "comment"
columns = [
...
]
@@ -24,6 +27,20 @@ schema = {
}
```
+### table
+
+The table full name of the table identifier which the schema belongs to, it
contains database, schema, table name. e.g. `database.schema.table`,
`database.table`, `table`.
+
+### schema_first
+
+Default is false.
+
+If the schema_first is true, the schema will be used first, this means if we
set `table = "a.b"`, `a` will be parsed as schema rather than database, then we
can support write `table = "schema.table"`.
+
+### comment
+
+The comment of the CatalogTable which the schema belongs to.
+
### Columns
Columns is a list of config used to define the column in schema, each column
can contains name, type, nullable, defaultValue, comment field.
@@ -131,6 +148,7 @@ source {
result_table_name = "fake"
row.num = 16
schema {
+ table = "FakeDatabase.FakeTable"
columns = [
{
name = id
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index b7d3d3d1f5..d23fc1a586 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -265,7 +265,21 @@ The template list of double type that connector generated,
if user configured it
### table-names
-The table list that connector generated, used to simulate multi-table scenarios
+The table list that connector generated, used to simulate multi-table
scenarios.
+
+This option will override the `table` option in the `schema` option.
+For example, if you configure the `table-names` option as follows, the
connector will generate data for the `test.table1` and `test.table2` tables,
the `database.schema.table` will be drop.
+
+```agsl
+FakeSource {
+ table-names = ["test.table1", "test.table2"]
+ schema = {
+ table = "database.schema.table"
+ ...
+ }
+ ...
+}
+```
### common options
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
index 5b1f124863..d4c8d1e332 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -48,7 +48,8 @@ public final class CatalogTable implements Serializable {
Map<String, String> options,
List<String> partitionKeys,
String comment) {
- return new CatalogTable(tableId, tableSchema, options, partitionKeys,
comment);
+ return new CatalogTable(
+ tableId, tableSchema, options, partitionKeys, comment,
tableId.getCatalogName());
}
public static CatalogTable of(
@@ -67,7 +68,7 @@ public final class CatalogTable implements Serializable {
Map<String, String> options,
List<String> partitionKeys,
String comment) {
- this(tableId, tableSchema, options, partitionKeys, comment, "");
+ this(tableId, tableSchema, options, partitionKeys, comment,
tableId.getCatalogName());
}
private CatalogTable(
@@ -127,6 +128,9 @@ public final class CatalogTable implements Serializable {
+ ", comment='"
+ comment
+ '\''
+ + ", catalogName='"
+ + catalogName
+ + '\''
+ '}';
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 880d97ca46..b258faaf4d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -31,6 +31,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.commons.lang3.StringUtils;
+
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
@@ -135,7 +137,7 @@ public class CatalogTableUtil implements Serializable {
if (schemaMap.isEmpty()) {
throw new SeaTunnelException("Schema config can not be empty");
}
- CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(factoryId, readonlyConfig);
return Collections.singletonList(catalogTable);
}
@@ -190,6 +192,9 @@ public class CatalogTableUtil implements Serializable {
}
}
+ // We need to use buildWithConfig(String catalogName, ReadonlyConfig
readonlyConfig);
+ // Since this method will not inject the correct catalogName into
CatalogTable
+ @Deprecated
public static List<CatalogTable> convertDataTypeToCatalogTables(
SeaTunnelDataType<?> seaTunnelDataType, String tableId) {
List<CatalogTable> catalogTables;
@@ -210,18 +215,42 @@ public class CatalogTableUtil implements Serializable {
}
public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
+ return buildWithConfig("", readonlyConfig);
+ }
+
+ public static CatalogTable buildWithConfig(String catalogName,
ReadonlyConfig readonlyConfig) {
if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
throw new RuntimeException(
"Schema config need option [schema], please correct your
config first");
}
TableSchema tableSchema = new
ReadonlyConfigParser().parse(readonlyConfig);
+
+ ReadonlyConfig schemaConfig =
+ readonlyConfig
+ .getOptional(TableSchemaOptions.SCHEMA)
+ .map(ReadonlyConfig::fromMap)
+ .orElseThrow(
+ () -> new IllegalArgumentException("Schema
config can't be null"));
+
+ TablePath tablePath;
+ if (StringUtils.isNotEmpty(
+
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE))) {
+ tablePath =
+ TablePath.of(
+
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE),
+ schemaConfig.get(
+
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
+ } else {
+ tablePath = TablePath.EMPTY;
+ }
+
return CatalogTable.of(
- // TODO: other table info
- TableIdentifier.of("", "", ""),
+ TableIdentifier.of(catalogName, tablePath),
tableSchema,
new HashMap<>(),
+ // todo: add partitionKeys?
new ArrayList<>(),
- "");
+
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
}
public static SeaTunnelRowType buildSimpleTextSchema() {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
index 358e873b99..d123d3a330 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -34,12 +34,26 @@ public final class TablePath implements Serializable {
private final String schemaName;
private final String tableName;
+ public static final TablePath EMPTY = TablePath.of(null, null, null);
+
public static TablePath of(String fullName) {
+ return of(fullName, false);
+ }
+
+ public static TablePath of(String fullName, boolean schemaFirst) {
String[] paths = fullName.split("\\.");
+ if (paths.length == 1) {
+ return of(null, paths[0]);
+ }
+
if (paths.length == 2) {
- return of(paths[0], paths[1]);
+ if (schemaFirst) {
+ return of(null, paths[0], paths[1]);
+ }
+ return of(paths[0], null, paths[1]);
}
+
if (paths.length == 3) {
return of(paths[0], paths[1], paths[2]);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
index 19f858b98f..492fe1909c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
@@ -28,6 +28,27 @@ import java.util.Map;
public class TableSchemaOptions {
+ public static class TableIdentifierOptions {
+
+ public static final Option<Boolean> SCHEMA_FIRST =
+ Options.key("schema_first")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Parse Schema First from table");
+
+ public static final Option<String> TABLE =
+ Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("SeaTunnel Schema Full Table Name");
+
+ public static final Option<String> COMMENT =
+ Options.key("comment")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("SeaTunnel Schema Table Comment");
+ }
+
public static final Option<Map<String, Object>> SCHEMA =
Options.key("schema")
.type(new TypeReference<Map<String, Object>>() {})
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
index 5245663b54..9d4e35493a 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
@@ -48,6 +49,9 @@ public class AssertCatalogTableRule implements Serializable {
@OptionMark(description = "column rule")
private AssertColumnRule columnRule;
+ @OptionMark(description = "tableIdentifier rule")
+ private AssertTableIdentifierRule tableIdentifierRule;
+
public void checkRule(CatalogTable catalogTable) {
TableSchema tableSchema = catalogTable.getTableSchema();
if (tableSchema == null) {
@@ -62,6 +66,9 @@ public class AssertCatalogTableRule implements Serializable {
if (columnRule != null) {
columnRule.checkRule(tableSchema.getColumns());
}
+ if (tableIdentifierRule != null) {
+ tableIdentifierRule.checkRule(catalogTable.getTableId());
+ }
}
@Data
@@ -138,4 +145,24 @@ public class AssertCatalogTableRule implements
Serializable {
}
}
}
+
+ @Data
+ @AllArgsConstructor
+ public static class AssertTableIdentifierRule implements Serializable {
+
+ private TableIdentifier tableIdentifier;
+
+ public void checkRule(TableIdentifier actiualTableIdentifier) {
+ if (actiualTableIdentifier == null) {
+ throw new AssertConnectorException(CATALOG_TABLE_FAILED,
"tableIdentifier is null");
+ }
+ if (!actiualTableIdentifier.equals(tableIdentifier)) {
+ throw new AssertConnectorException(
+ CATALOG_TABLE_FAILED,
+ String.format(
+ "tableIdentifier: %s is not equal to %s",
+ actiualTableIdentifier, tableIdentifier));
+ }
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
index dc5063a2c8..fee135d4f1 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -46,6 +48,9 @@ import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_COLUMNS;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_RULE;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_CATALOG_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_RULE;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_TABLE_NAME;
public class AssertCatalogTableRuleParser {
@@ -55,6 +60,7 @@ public class AssertCatalogTableRuleParser {
parsePrimaryKeyRule(catalogTableRule).ifPresent(tableRule::setPrimaryKeyRule);
parseConstraintKeyRule(catalogTableRule).ifPresent(tableRule::setConstraintKeyRule);
parseColumnRule(catalogTableRule).ifPresent(tableRule::setColumnRule);
+
parseTableIdentifierRule(catalogTableRule).ifPresent(tableRule::setTableIdentifierRule);
return tableRule;
}
@@ -156,4 +162,17 @@ public class AssertCatalogTableRuleParser {
.collect(Collectors.toList());
return Optional.of(new
AssertCatalogTableRule.AssertConstraintKeyRule(constraintKeys));
}
+
+ private Optional<AssertCatalogTableRule.AssertTableIdentifierRule>
parseTableIdentifierRule(
+ Config catalogTableRule) {
+ if (!catalogTableRule.hasPath(TABLE_IDENTIFIER_RULE)) {
+ return Optional.empty();
+ }
+ Config tableIdentifierRule =
catalogTableRule.getConfig(TABLE_IDENTIFIER_RULE);
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+
tableIdentifierRule.getString(TABLE_IDENTIFIER_CATALOG_NAME),
+
TablePath.of(tableIdentifierRule.getString(TABLE_IDENTIFIER_TABLE_NAME)));
+ return Optional.of(new
AssertCatalogTableRule.AssertTableIdentifierRule(tableIdentifier));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
index 5d9dddc58d..d86c034d68 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -64,6 +64,13 @@ public class AssertConfig {
public static final String COLUMN_DEFAULT_VALUE = "default_value";
public static final String COLUMN_COMMENT = "comment";
+ public static class TableIdentifierRule {
+ public static final String TABLE_IDENTIFIER_RULE =
"table_identifier_rule";
+
+ public static final String TABLE_IDENTIFIER_CATALOG_NAME =
"catalog_name";
+ public static final String TABLE_IDENTIFIER_TABLE_NAME = "table";
+ }
+
public static final Option<String> COMMENT =
Options.key("comment")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index f3aeb8d88e..e298fbea6d 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -416,7 +416,7 @@ public class FakeConfig implements Serializable {
List<String> tableNames =
config.getStringList(CatalogOptions.TABLE_NAMES.key());
List<TableIdentifier> tableIdentifiers = new
ArrayList<>(tableNames.size());
for (String tableName : tableNames) {
- tableIdentifiers.add(TableIdentifier.of("fake",
TablePath.of(tableName)));
+ tableIdentifiers.add(TableIdentifier.of("FakeSource",
TablePath.of(tableName)));
}
builder.tableIdentifiers(tableIdentifiers);
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 15118d2bff..e634347fde 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,6 +42,8 @@ import
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
+import org.apache.commons.collections4.CollectionUtils;
+
import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;
@@ -61,7 +64,7 @@ public class FakeSource
public FakeSource() {}
public FakeSource(ReadonlyConfig readonlyConfig) {
- this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
+ this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(),
readonlyConfig);
this.fakeConfig =
FakeConfig.buildWithConfig(readonlyConfig.toConfig());
}
@@ -74,20 +77,23 @@ public class FakeSource
@Override
public List<CatalogTable> getProducedCatalogTables() {
- if (fakeConfig.getTableIdentifiers().isEmpty()) {
+ // If tableNames is empty, means this is only one catalogTable, return
the original
+ // catalogTable
+ if (CollectionUtils.isEmpty(fakeConfig.getTableIdentifiers())) {
return Lists.newArrayList(catalogTable);
- } else {
- return fakeConfig.getTableIdentifiers().stream()
- .map(
- tableIdentifier ->
- CatalogTable.of(
- tableIdentifier,
- catalogTable.getTableSchema(),
- catalogTable.getOptions(),
- catalogTable.getPartitionKeys(),
- catalogTable.getComment()))
- .collect(Collectors.toList());
}
+ // Otherwise, return the catalogTables with the tableNames
+ return fakeConfig.getTableIdentifiers().stream()
+ .map(
+ tableIdentifier ->
+ CatalogTable.of(
+ TableIdentifier.of(
+ getPluginName(),
tableIdentifier.toTablePath()),
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment()))
+ .collect(Collectors.toList());
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
index 4a98662b76..ee33778556 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
@@ -24,6 +24,7 @@ source {
FakeSource {
row.num = 100
schema = {
+ table = "test.fakeTable"
columns = [
{
name = id
@@ -63,6 +64,11 @@ sink{
Assert {
rules {
catalog_table_rule {
+ table_identifier_rule = {
+ catalog_name = "FakeSource"
+ table = "test.fakeTable"
+ }
+
primary_key_rule = {
primary_key_name = "primary key"
primary_key_columns = ["id"]