This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2489f6446b [Improve][Connector] Add multi-table sink option check 
(#7360)
2489f6446b is described below

commit 2489f6446bae5b971a26e4cd3094f7d7af9d0208
Author: hailin0 <[email protected]>
AuthorDate: Mon Aug 12 13:36:15 2024 +0800

    [Improve][Connector] Add multi-table sink option check (#7360)
    
    * [Improve][Connector] Add multi-table sink option check
    
    * fix
---
 .../org/apache/seatunnel/api/sink/SinkCommonOptions.java |  2 +-
 .../seatunnel/assertion/sink/AssertSinkFactory.java      |  6 +++++-
 .../seatunnel/console/sink/ConsoleSinkFactory.java       |  6 +++++-
 .../connectors/druid/sink/DruidSinkFactory.java          |  6 +++++-
 .../elasticsearch/sink/ElasticsearchSinkFactory.java     |  4 +++-
 .../seatunnel/file/local/sink/LocalFileSinkFactory.java  |  2 ++
 .../seatunnel/file/oss/sink/OssFileSinkFactory.java      |  2 ++
 .../seatunnel/file/s3/sink/S3FileSinkFactory.java        |  2 ++
 .../connectors/seatunnel/http/sink/HttpSinkFactory.java  |  2 ++
 .../connectors/seatunnel/hudi/sink/HudiSinkFactory.java  |  4 +++-
 .../seatunnel/iceberg/sink/IcebergSinkFactory.java       |  4 +++-
 .../seatunnel/influxdb/sink/InfluxDBSinkFactory.java     |  4 +++-
 .../connectors/seatunnel/kudu/sink/KuduSinkFactory.java  |  2 ++
 .../seatunnel/paimon/sink/PaimonSinkFactory.java         |  4 +++-
 .../seatunnel/redis/sink/RedisSinkFactory.java           |  4 +++-
 .../api/connector/ConnectorSpecificationCheckTest.java   | 16 ++++++++++++++--
 16 files changed, 58 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
index 598193d695..9c6538ac87 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
@@ -28,5 +28,5 @@ public class SinkCommonOptions {
             Options.key("multi_table_sink_replica")
                     .intType()
                     .defaultValue(1)
-                    .withDescription("The replica number of multi table sink");
+                    .withDescription("The replica number of multi table sink 
writer");
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
index 376863dc18..ae174d9857 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -37,7 +38,10 @@ public class AssertSinkFactory implements TableSinkFactory {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(RULES).build();
+        return OptionRule.builder()
+                .required(RULES)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .build();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 169a281fc1..fa5c7deae9 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -52,7 +53,10 @@ public class ConsoleSinkFactory implements TableSinkFactory {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().optional(LOG_PRINT_DATA, 
LOG_PRINT_DELAY).build();
+        return OptionRule.builder()
+                .optional(
+                        LOG_PRINT_DATA, LOG_PRINT_DELAY, 
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .build();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
index 0c6824b521..3199d3d66f 100644
--- 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.druid.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -40,7 +41,10 @@ public class DruidSinkFactory implements TableSinkFactory {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(COORDINATOR_URL, 
DATASOURCE).build();
+        return OptionRule.builder()
+                .required(COORDINATOR_URL, DATASOURCE)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .build();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 56ec1d0ab7..b290a63c44 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
@@ -69,7 +70,8 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
                         TLS_KEY_STORE_PATH,
                         TLS_KEY_STORE_PASSWORD,
                         TLS_TRUST_STORE_PATH,
-                        TLS_TRUST_STORE_PASSWORD)
+                        TLS_TRUST_STORE_PASSWORD,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index e8ee8e436d..1a9bcc1734 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.local.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -48,6 +49,7 @@ public class LocalFileSinkFactory extends 
BaseMultipleTableFileSinkFactory {
                 .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
                 .optional(BaseSinkConfig.DATA_SAVE_MODE)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .conditional(
                         BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 5d6cb649f2..6fd3088ddc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -102,6 +103,7 @@ public class OssFileSinkFactory extends 
BaseMultipleTableFileSinkFactory {
                 .optional(BaseSinkConfig.DATE_FORMAT)
                 .optional(BaseSinkConfig.DATETIME_FORMAT)
                 .optional(BaseSinkConfig.TIME_FORMAT)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 4ac9f45915..5c231443e9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.s3.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -103,6 +104,7 @@ public class S3FileSinkFactory implements TableSinkFactory {
                 .optional(BaseSinkConfig.DATETIME_FORMAT)
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .optional(BaseSinkConfig.TMP_PATH)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
index 539563ecb6..313d26dd3f 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.http.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -49,6 +50,7 @@ public class HttpSinkFactory implements TableSinkFactory {
                 .optional(HttpConfig.RETRY)
                 .optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
                 .optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
index d38785de02..7697842f82 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -19,6 +19,7 @@
 package org.apache.seatunnel.connectors.seatunnel.hudi.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -61,7 +62,8 @@ public class HudiSinkFactory implements TableSinkFactory {
                         INSERT_SHUFFLE_PARALLELISM,
                         UPSERT_SHUFFLE_PARALLELISM,
                         MIN_COMMITS_TO_KEEP,
-                        MAX_COMMITS_TO_KEEP)
+                        MAX_COMMITS_TO_KEEP,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
index b32430b319..212bb6371d 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.iceberg.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
@@ -57,7 +58,8 @@ public class IcebergSinkFactory implements TableSinkFactory {
                         SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
                         SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
                         SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
-                        SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH)
+                        SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
index 81a294e95b..a8c13cdbff 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -65,7 +66,8 @@ public class InfluxDBSinkFactory implements TableSinkFactory {
                         KEY_TIME,
                         BATCH_SIZE,
                         MAX_RETRIES,
-                        RETRY_BACKOFF_MULTIPLIER_MS)
+                        RETRY_BACKOFF_MULTIPLIER_MS,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
index 3917d1cd62..beff65521d 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -56,6 +57,7 @@ public class KuduSinkFactory implements TableSinkFactory {
                 .optional(KuduSinkConfig.IGNORE_DUPLICATE)
                 .optional(KuduSinkConfig.ENABLE_KERBEROS)
                 .optional(KuduSinkConfig.KERBEROS_KRB5_CONF)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .conditional(
                         KuduSinkConfig.FLUSH_MODE,
                         Arrays.asList(AUTO_FLUSH_BACKGROUND.name(), 
MANUAL_FLUSH.name()),
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index 83976d84f9..bbc74df3ce 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
@@ -54,7 +55,8 @@ public class PaimonSinkFactory implements TableSinkFactory {
                         PaimonSinkConfig.DATA_SAVE_MODE,
                         PaimonSinkConfig.PRIMARY_KEYS,
                         PaimonSinkConfig.PARTITION_KEYS,
-                        PaimonSinkConfig.WRITE_PROPS)
+                        PaimonSinkConfig.WRITE_PROPS,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .conditional(
                         PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, 
PaimonConfig.CATALOG_URI)
                 .build();
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index c4768c0618..49c2644d70 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.redis.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -51,7 +52,8 @@ public class RedisSinkFactory implements TableSinkFactory {
                         RedisConfig.USER,
                         RedisConfig.KEY_PATTERN,
                         RedisConfig.FORMAT,
-                        RedisConfig.EXPIRE)
+                        RedisConfig.EXPIRE,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
                 .build();
     }
diff --git 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index 62a037a6f6..3628a5dce6 100644
--- 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++ 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.api.connector;
 
+import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
@@ -152,16 +154,26 @@ public class ConnectorSpecificationCheckTest {
                 log.info(
                         "Check sink connector {} successfully", 
factory.getClass().getSimpleName());
 
-                checkSupportMultiTableSink(sinkClass);
+                checkSupportMultiTableSink(factory, sinkClass);
             }
         }
     }
 
-    private void checkSupportMultiTableSink(Class<? extends SeaTunnelSink> 
sinkClass) {
+    private void checkSupportMultiTableSink(
+            TableSinkFactory sinkFactory, Class<? extends SeaTunnelSink> 
sinkClass) {
         if (!SupportMultiTableSink.class.isAssignableFrom(sinkClass)) {
             return;
         }
 
+        OptionRule sinkOptionRule = sinkFactory.optionRule();
+        Assertions.assertTrue(
+                sinkOptionRule
+                        .getOptionalOptions()
+                        .contains(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA),
+                "Please add `SinkCommonOptions.MULTI_TABLE_SINK_REPLICA` 
optional into the `optionRule` method optional of `"
+                        + sinkFactory.getClass().getSimpleName()
+                        + "`");
+
         // Validate the `createWriter` method return type
         Optional<Method> createWriter =
                 ReflectionUtils.getDeclaredMethod(

Reply via email to