This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2489f6446b [Improve][Connector] Add multi-table sink option check
(#7360)
2489f6446b is described below
commit 2489f6446bae5b971a26e4cd3094f7d7af9d0208
Author: hailin0 <[email protected]>
AuthorDate: Mon Aug 12 13:36:15 2024 +0800
[Improve][Connector] Add multi-table sink option check (#7360)
* [Improve][Connector] Add multi-table sink option check
* fix
---
.../org/apache/seatunnel/api/sink/SinkCommonOptions.java | 2 +-
.../seatunnel/assertion/sink/AssertSinkFactory.java | 6 +++++-
.../seatunnel/console/sink/ConsoleSinkFactory.java | 6 +++++-
.../connectors/druid/sink/DruidSinkFactory.java | 6 +++++-
.../elasticsearch/sink/ElasticsearchSinkFactory.java | 4 +++-
.../seatunnel/file/local/sink/LocalFileSinkFactory.java | 2 ++
.../seatunnel/file/oss/sink/OssFileSinkFactory.java | 2 ++
.../seatunnel/file/s3/sink/S3FileSinkFactory.java | 2 ++
.../connectors/seatunnel/http/sink/HttpSinkFactory.java | 2 ++
.../connectors/seatunnel/hudi/sink/HudiSinkFactory.java | 4 +++-
.../seatunnel/iceberg/sink/IcebergSinkFactory.java | 4 +++-
.../seatunnel/influxdb/sink/InfluxDBSinkFactory.java | 4 +++-
.../connectors/seatunnel/kudu/sink/KuduSinkFactory.java | 2 ++
.../seatunnel/paimon/sink/PaimonSinkFactory.java | 4 +++-
.../seatunnel/redis/sink/RedisSinkFactory.java | 4 +++-
.../api/connector/ConnectorSpecificationCheckTest.java | 16 ++++++++++++++--
16 files changed, 58 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
index 598193d695..9c6538ac87 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
@@ -28,5 +28,5 @@ public class SinkCommonOptions {
Options.key("multi_table_sink_replica")
.intType()
.defaultValue(1)
- .withDescription("The replica number of multi table sink");
+ .withDescription("The replica number of multi table sink
writer");
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
index 376863dc18..ae174d9857 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -37,7 +38,10 @@ public class AssertSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(RULES).build();
+ return OptionRule.builder()
+ .required(RULES)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .build();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 169a281fc1..fa5c7deae9 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -52,7 +53,10 @@ public class ConsoleSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().optional(LOG_PRINT_DATA,
LOG_PRINT_DELAY).build();
+ return OptionRule.builder()
+ .optional(
+ LOG_PRINT_DATA, LOG_PRINT_DELAY,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .build();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
index 0c6824b521..3199d3d66f 100644
---
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.druid.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -40,7 +41,10 @@ public class DruidSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(COORDINATOR_URL,
DATASOURCE).build();
+ return OptionRule.builder()
+ .required(COORDINATOR_URL, DATASOURCE)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .build();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 56ec1d0ab7..b290a63c44 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -69,7 +70,8 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
TLS_KEY_STORE_PATH,
TLS_KEY_STORE_PASSWORD,
TLS_TRUST_STORE_PATH,
- TLS_TRUST_STORE_PASSWORD)
+ TLS_TRUST_STORE_PASSWORD,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index e8ee8e436d..1a9bcc1734 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.local.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -48,6 +49,7 @@ public class LocalFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
.optional(BaseSinkConfig.DATA_SAVE_MODE)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 5d6cb649f2..6fd3088ddc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -102,6 +103,7 @@ public class OssFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
.optional(BaseSinkConfig.DATE_FORMAT)
.optional(BaseSinkConfig.DATETIME_FORMAT)
.optional(BaseSinkConfig.TIME_FORMAT)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 4ac9f45915..5c231443e9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.s3.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -103,6 +104,7 @@ public class S3FileSinkFactory implements TableSinkFactory {
.optional(BaseSinkConfig.DATETIME_FORMAT)
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.TMP_PATH)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
index 539563ecb6..313d26dd3f 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -49,6 +50,7 @@ public class HttpSinkFactory implements TableSinkFactory {
.optional(HttpConfig.RETRY)
.optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
.optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
index d38785de02..7697842f82 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -19,6 +19,7 @@
package org.apache.seatunnel.connectors.seatunnel.hudi.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -61,7 +62,8 @@ public class HudiSinkFactory implements TableSinkFactory {
INSERT_SHUFFLE_PARALLELISM,
UPSERT_SHUFFLE_PARALLELISM,
MIN_COMMITS_TO_KEEP,
- MAX_COMMITS_TO_KEEP)
+ MAX_COMMITS_TO_KEEP,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
index b32430b319..212bb6371d 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.iceberg.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -57,7 +58,8 @@ public class IcebergSinkFactory implements TableSinkFactory {
SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
- SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH)
+ SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
index 81a294e95b..a8c13cdbff 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -65,7 +66,8 @@ public class InfluxDBSinkFactory implements TableSinkFactory {
KEY_TIME,
BATCH_SIZE,
MAX_RETRIES,
- RETRY_BACKOFF_MULTIPLIER_MS)
+ RETRY_BACKOFF_MULTIPLIER_MS,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
index 3917d1cd62..beff65521d 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -56,6 +57,7 @@ public class KuduSinkFactory implements TableSinkFactory {
.optional(KuduSinkConfig.IGNORE_DUPLICATE)
.optional(KuduSinkConfig.ENABLE_KERBEROS)
.optional(KuduSinkConfig.KERBEROS_KRB5_CONF)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
KuduSinkConfig.FLUSH_MODE,
Arrays.asList(AUTO_FLUSH_BACKGROUND.name(),
MANUAL_FLUSH.name()),
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index 83976d84f9..bbc74df3ce 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -54,7 +55,8 @@ public class PaimonSinkFactory implements TableSinkFactory {
PaimonSinkConfig.DATA_SAVE_MODE,
PaimonSinkConfig.PRIMARY_KEYS,
PaimonSinkConfig.PARTITION_KEYS,
- PaimonSinkConfig.WRITE_PROPS)
+ PaimonSinkConfig.WRITE_PROPS,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE,
PaimonConfig.CATALOG_URI)
.build();
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index c4768c0618..49c2644d70 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -51,7 +52,8 @@ public class RedisSinkFactory implements TableSinkFactory {
RedisConfig.USER,
RedisConfig.KEY_PATTERN,
RedisConfig.FORMAT,
- RedisConfig.EXPIRE)
+ RedisConfig.EXPIRE,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
.build();
}
diff --git
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index 62a037a6f6..3628a5dce6 100644
---
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.api.connector;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
@@ -152,16 +154,26 @@ public class ConnectorSpecificationCheckTest {
log.info(
"Check sink connector {} successfully",
factory.getClass().getSimpleName());
- checkSupportMultiTableSink(sinkClass);
+ checkSupportMultiTableSink(factory, sinkClass);
}
}
}
- private void checkSupportMultiTableSink(Class<? extends SeaTunnelSink>
sinkClass) {
+ private void checkSupportMultiTableSink(
+ TableSinkFactory sinkFactory, Class<? extends SeaTunnelSink>
sinkClass) {
if (!SupportMultiTableSink.class.isAssignableFrom(sinkClass)) {
return;
}
+ OptionRule sinkOptionRule = sinkFactory.optionRule();
+ Assertions.assertTrue(
+ sinkOptionRule
+ .getOptionalOptions()
+ .contains(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA),
+ "Please add `SinkCommonOptions.MULTI_TABLE_SINK_REPLICA`
optional into the `optionRule` method optional of `"
+ + sinkFactory.getClass().getSimpleName()
+ + "`");
+
// Validate the `createWriter` method return type
Optional<Method> createWriter =
ReflectionUtils.getDeclaredMethod(