This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c747e02a98 [improve] amazon sqs connector update (#8602)
c747e02a98 is described below

commit c747e02a980e3eff58b75c466c1bcfe2a195e226
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 14:00:52 2025 +0800

    [improve] amazon sqs connector update (#8602)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   1 -
 ...zonSqsConfig.java => AmazonSqsBaseOptions.java} |  18 +--
 .../amazonsqs/config/AmazonSqsSinkOptions.java     |  20 ++++
 ...urceOptions.java => AmazonSqsSourceConfig.java} |  31 ++----
 .../amazonsqs/config/AmazonSqsSourceOptions.java   |  60 ++++------
 .../amazonsqs/sink/AmazonSqsSinkFactory.java       |  13 ++-
 .../amazonsqs/sink/AmazonSqsSinkWriter.java        |  14 +--
 .../amazonsqs/source/AmazonSqsSource.java          | 122 ++++-----------------
 .../amazonsqs/source/AmazonSqsSourceFactory.java   | 101 ++++++++++++++++-
 .../amazonsqs/source/AmazonSqsSourceReader.java    |  30 ++---
 10 files changed, 200 insertions(+), 210 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 2a814b08ef..88f4f69a29 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -194,7 +194,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("EasysearchSourceOptions");
         whiteList.add("RabbitmqSinkOptions");
         whiteList.add("StarRocksSourceOptions");
-        whiteList.add("AmazonSqsSinkOptions");
         whiteList.add("IcebergSourceOptions");
         whiteList.add("HbaseSourceOptions");
         whiteList.add("PaimonSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsConfig.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsBaseOptions.java
similarity index 76%
rename from 
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsConfig.java
rename to 
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsBaseOptions.java
index 505756908c..3b9e23771c 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsConfig.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsBaseOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
 
 import java.io.Serializable;
 
-public class AmazonSqsConfig implements Serializable {
+public class AmazonSqsBaseOptions implements Serializable {
 
     public static final String DEFAULT_FIELD_DELIMITER = ",";
 
@@ -47,11 +47,6 @@ public class AmazonSqsConfig implements Serializable {
                     .noDefaultValue()
                     .withDescription("The access secret key of Amazon SQS 
Service");
 
-    public static final Option<String> MESSAGE_GROUP_ID =
-            Options.key("message_group_id")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The message group id of Amazon SQS 
Service");
     public static final Option<MessageFormat> FORMAT =
             Options.key("format")
                     .enumType(MessageFormat.class)
@@ -64,15 +59,4 @@ public class AmazonSqsConfig implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Customize the field delimiter for data 
format.");
-    public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
-            Options.key("debezium_record_include_schema")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription("Does the debezium record carry a 
schema.");
-
-    public static final Option<Boolean> DELETE_MESSAGE =
-            Options.key("delete_message")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Delete the message after it is consumed 
if set true.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSinkOptions.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSinkOptions.java
new file mode 100644
index 0000000000..13502ca1f8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSinkOptions.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
+
+public class AmazonSqsSinkOptions extends AmazonSqsBaseOptions {}
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceConfig.java
similarity index 52%
copy from 
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
copy to 
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceConfig.java
index acb4d6831c..58113a0d91 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceConfig.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -28,7 +28,7 @@ import java.io.Serializable;
 
 @Data
 @AllArgsConstructor
-public class AmazonSqsSourceOptions implements Serializable {
+public class AmazonSqsSourceConfig implements Serializable {
 
     private String url;
 
@@ -44,23 +44,14 @@ public class AmazonSqsSourceOptions implements Serializable 
{
 
     private Config schema;
 
-    public AmazonSqsSourceOptions(Config config) {
-        this.url = config.getString(AmazonSqsConfig.URL.key());
-        this.region = config.getString(AmazonSqsConfig.REGION.key());
-        if (config.hasPath(AmazonSqsConfig.ACCESS_KEY_ID.key())) {
-            this.accessKeyId = 
config.getString(AmazonSqsConfig.ACCESS_KEY_ID.key());
-        }
-        if (config.hasPath(AmazonSqsConfig.SECRET_ACCESS_KEY.key())) {
-            this.secretAccessKey = 
config.getString(AmazonSqsConfig.SECRET_ACCESS_KEY.key());
-        }
-        if (config.hasPath(AmazonSqsConfig.MESSAGE_GROUP_ID.key())) {
-            this.messageGroupId = 
config.getString(AmazonSqsConfig.MESSAGE_GROUP_ID.key());
-        }
-        if (config.hasPath(AmazonSqsConfig.DELETE_MESSAGE.key())) {
-            this.deleteMessage = 
config.getBoolean(AmazonSqsConfig.DELETE_MESSAGE.key());
-        }
-        if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
-        }
+    public AmazonSqsSourceConfig(ReadonlyConfig config) {
+        this.url = config.get(AmazonSqsSourceOptions.URL);
+        this.region = config.get(AmazonSqsSourceOptions.REGION);
+        this.accessKeyId = config.get(AmazonSqsSourceOptions.ACCESS_KEY_ID);
+        this.secretAccessKey = 
config.get(AmazonSqsSourceOptions.SECRET_ACCESS_KEY);
+        this.messageGroupId = 
config.get(AmazonSqsSourceOptions.MESSAGE_GROUP_ID);
+        this.deleteMessage = config.get(AmazonSqsSourceOptions.DELETE_MESSAGE);
+        this.schema = 
ReadonlyConfig.fromMap(config.get(AmazonSqsSourceOptions.SCHEMA)).toConfig();
+        ;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
index acb4d6831c..07ee216fe3 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
@@ -17,50 +17,30 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class AmazonSqsSourceOptions implements Serializable {
-
-    private String url;
-
-    private String region;
-
-    private String accessKeyId;
-
-    private String secretAccessKey;
+import java.util.Map;
 
-    private String messageGroupId;
+public class AmazonSqsSourceOptions extends AmazonSqsBaseOptions {
+    public static final Option<Map<String, Object>> SCHEMA = 
TableSchemaOptions.SCHEMA;
 
-    private boolean deleteMessage;
+    public static final Option<Boolean> DELETE_MESSAGE =
+            Options.key("delete_message")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Delete the message after it is consumed 
if set true.");
 
-    private Config schema;
+    public static final Option<String> MESSAGE_GROUP_ID =
+            Options.key("message_group_id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The message group id of Amazon SQS 
Service");
 
-    public AmazonSqsSourceOptions(Config config) {
-        this.url = config.getString(AmazonSqsConfig.URL.key());
-        this.region = config.getString(AmazonSqsConfig.REGION.key());
-        if (config.hasPath(AmazonSqsConfig.ACCESS_KEY_ID.key())) {
-            this.accessKeyId = 
config.getString(AmazonSqsConfig.ACCESS_KEY_ID.key());
-        }
-        if (config.hasPath(AmazonSqsConfig.SECRET_ACCESS_KEY.key())) {
-            this.secretAccessKey = 
config.getString(AmazonSqsConfig.SECRET_ACCESS_KEY.key());
-        }
-        if (config.hasPath(AmazonSqsConfig.MESSAGE_GROUP_ID.key())) {
-            this.messageGroupId = 
config.getString(AmazonSqsConfig.MESSAGE_GROUP_ID.key());
-        }
-        if (config.hasPath(AmazonSqsConfig.DELETE_MESSAGE.key())) {
-            this.deleteMessage = 
config.getBoolean(AmazonSqsConfig.DELETE_MESSAGE.key());
-        }
-        if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
-        }
-    }
+    public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
+            Options.key("debezium_record_include_schema")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Does the debezium record carry a 
schema.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
index 030e9d221a..50ea762740 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
@@ -27,8 +27,12 @@ import 
org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.SECRET_ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.URL;
 
 @AutoService(Factory.class)
 public class AmazonSqsSinkFactory implements TableSinkFactory {
@@ -46,6 +50,9 @@ public class AmazonSqsSinkFactory implements TableSinkFactory 
{
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URL, REGION).build();
+        return OptionRule.builder()
+                .required(URL, REGION)
+                .optional(ACCESS_KEY_ID, SECRET_ACCESS_KEY, FORMAT, 
FIELD_DELIMITER)
+                .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
index eb95c95b1d..c7ba34fdc1 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
@@ -41,13 +41,13 @@ import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.ACCESS_KEY_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.DEFAULT_FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.SECRET_ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.DEFAULT_FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.SECRET_ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.URL;
 
 public class AmazonSqsSinkWriter extends AbstractSinkWriter<SeaTunnelRow, 
Void> {
 
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
index c4c3ad372d..ee0d078ab1 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
@@ -17,79 +17,41 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazonsqs.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.MessageFormat;
-import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.exception.AmazonSqsConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
-import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
-import org.apache.seatunnel.format.text.TextDeserializationSchema;
-import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.DEFAULT_FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import java.util.Collections;
+import java.util.List;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class AmazonSqsSource extends AbstractSingleSplitSource<SeaTunnelRow>
         implements SupportColumnProjection {
 
-    private AmazonSqsSourceOptions amazonSqsSourceOptions;
+    private AmazonSqsSourceConfig amazonSqsSourceConfig;
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
-    private SeaTunnelRowType typeInfo;
     private CatalogTable catalogTable;
 
-    @Override
-    public String getPluginName() {
-        return "AmazonSqs";
+    public AmazonSqsSource(
+            AmazonSqsSourceConfig amazonSqsSourceConfig,
+            CatalogTable catalogTable,
+            DeserializationSchema<SeaTunnelRow> deserializationSchema) {
+        this.amazonSqsSourceConfig = amazonSqsSourceConfig;
+        this.catalogTable = catalogTable;
+        this.deserializationSchema = deserializationSchema;
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, URL.key(), REGION.key(), 
TableSchemaOptions.SCHEMA.key());
-        if (!result.isSuccess()) {
-            throw new AmazonSqsConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.amazonSqsSourceOptions = new AmazonSqsSourceOptions(pluginConfig);
-        this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
-        this.typeInfo = catalogTable.getSeaTunnelRowType();
-        setDeserialization(pluginConfig);
+    public String getPluginName() {
+        return "AmazonSqs";
     }
 
     @Override
@@ -98,63 +60,17 @@ public class AmazonSqsSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.typeInfo;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> createReader(
             SingleSplitReaderContext readerContext) throws Exception {
         return new AmazonSqsSourceReader(
-                readerContext, amazonSqsSourceOptions, deserializationSchema, 
typeInfo);
-    }
-
-    private void setDeserialization(Config config) {
-        if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            MessageFormat format = 
ReadonlyConfig.fromConfig(config).get(FORMAT);
-            switch (format) {
-                case JSON:
-                    deserializationSchema =
-                            new JsonDeserializationSchema(catalogTable, false, 
false);
-                    break;
-                case TEXT:
-                    String delimiter = DEFAULT_FIELD_DELIMITER;
-                    if (config.hasPath(FIELD_DELIMITER.key())) {
-                        delimiter = config.getString(FIELD_DELIMITER.key());
-                    }
-                    deserializationSchema =
-                            TextDeserializationSchema.builder()
-                                    .seaTunnelRowType(typeInfo)
-                                    .delimiter(delimiter)
-                                    .build();
-                    break;
-                case CANAL_JSON:
-                    deserializationSchema =
-                            
CanalJsonDeserializationSchema.builder(catalogTable)
-                                    .setIgnoreParseErrors(true)
-                                    .build();
-                    break;
-                case DEBEZIUM_JSON:
-                    boolean includeSchema = 
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
-                    if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
-                        includeSchema = 
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
-                    }
-                    deserializationSchema =
-                            new DebeziumJsonDeserializationSchema(
-                                    catalogTable, true, includeSchema);
-                    break;
-                default:
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            "Unsupported format: " + format);
-            }
-        } else {
-            typeInfo = CatalogTableUtil.buildSimpleTextSchema();
-            this.deserializationSchema =
-                    TextDeserializationSchema.builder()
-                            .seaTunnelRowType(typeInfo)
-                            .delimiter(TextFormatConstant.PLACEHOLDER)
-                            .build();
-        }
+                readerContext,
+                amazonSqsSourceConfig,
+                deserializationSchema,
+                catalogTable.getSeaTunnelRowType());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
index 4f05b0b8f6..08d631ee11 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
@@ -17,16 +17,44 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazonsqs.source;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.MessageFormat;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.DEFAULT_FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.DELETE_MESSAGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.MESSAGE_GROUP_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.SECRET_ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.URL;
 
 @AutoService(Factory.class)
 public class AmazonSqsSourceFactory implements TableSourceFactory {
@@ -37,11 +65,76 @@ public class AmazonSqsSourceFactory implements 
TableSourceFactory {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URL, REGION, 
TableSchemaOptions.SCHEMA).build();
+        return OptionRule.builder()
+                .required(URL, REGION, SCHEMA)
+                .optional(
+                        ACCESS_KEY_ID,
+                        SECRET_ACCESS_KEY,
+                        MESSAGE_GROUP_ID,
+                        DELETE_MESSAGE,
+                        FORMAT,
+                        FIELD_DELIMITER,
+                        DEBEZIUM_RECORD_INCLUDE_SCHEMA)
+                .build();
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(context.getOptions());
+        DeserializationSchema<SeaTunnelRow> deserializationSchema =
+                setDeserialization(context.getOptions().toConfig(), 
catalogTable);
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new AmazonSqsSource(
+                                new 
AmazonSqsSourceConfig(context.getOptions()),
+                                catalogTable,
+                                deserializationSchema);
     }
 
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return AmazonSqsSource.class;
     }
+
+    private DeserializationSchema<SeaTunnelRow> setDeserialization(
+            Config config, CatalogTable catalogTable) {
+        DeserializationSchema<SeaTunnelRow> deserializationSchema;
+        MessageFormat format = ReadonlyConfig.fromConfig(config).get(FORMAT);
+        switch (format) {
+            case JSON:
+                deserializationSchema = new 
JsonDeserializationSchema(catalogTable, false, false);
+                break;
+            case TEXT:
+                String delimiter = DEFAULT_FIELD_DELIMITER;
+                if (config.hasPath(FIELD_DELIMITER.key())) {
+                    delimiter = config.getString(FIELD_DELIMITER.key());
+                }
+                deserializationSchema =
+                        TextDeserializationSchema.builder()
+                                
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+                                .delimiter(delimiter)
+                                .build();
+                break;
+            case CANAL_JSON:
+                deserializationSchema =
+                        CanalJsonDeserializationSchema.builder(catalogTable)
+                                .setIgnoreParseErrors(true)
+                                .build();
+                break;
+            case DEBEZIUM_JSON:
+                boolean includeSchema = 
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
+                if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
+                    includeSchema = 
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
+                }
+                deserializationSchema =
+                        new DebeziumJsonDeserializationSchema(catalogTable, 
true, includeSchema);
+                break;
+            default:
+                throw new SeaTunnelJsonFormatException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                        "Unsupported format: " + format);
+        }
+        return deserializationSchema;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
index bc28cf7b8e..309b328ee1 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
@@ -21,7 +21,7 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.deserialize.AmazonSqsDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.deserialize.SeaTunnelRowDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
@@ -47,40 +47,40 @@ public class AmazonSqsSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRo
 
     protected SqsClient sqsClient;
     protected SingleSplitReaderContext context;
-    protected AmazonSqsSourceOptions amazonSqsSourceOptions;
+    protected AmazonSqsSourceConfig amazonSqsSourceConfig;
     private final SeaTunnelRowDeserializer seaTunnelRowDeserializer;
 
     public AmazonSqsSourceReader(
             SingleSplitReaderContext context,
-            AmazonSqsSourceOptions amazonSqsSourceOptions,
+            AmazonSqsSourceConfig amazonSqsSourceConfig,
             DeserializationSchema<SeaTunnelRow> deserializationSchema,
             SeaTunnelRowType seaTunnelRowType) {
         this.context = context;
-        this.amazonSqsSourceOptions = amazonSqsSourceOptions;
+        this.amazonSqsSourceConfig = amazonSqsSourceConfig;
         this.seaTunnelRowDeserializer = new 
AmazonSqsDeserializer(deserializationSchema);
     }
 
     @Override
     public void open() throws Exception {
-        if (amazonSqsSourceOptions.getAccessKeyId() != null
-                & amazonSqsSourceOptions.getSecretAccessKey() != null) {
+        if (amazonSqsSourceConfig.getAccessKeyId() != null
+                & amazonSqsSourceConfig.getSecretAccessKey() != null) {
             sqsClient =
                     SqsClient.builder()
-                            
.endpointOverride(URI.create(amazonSqsSourceOptions.getUrl()))
+                            
.endpointOverride(URI.create(amazonSqsSourceConfig.getUrl()))
                             // The region is meaningless for local Sqs but 
required for client
                             // builder validation
-                            
.region(Region.of(amazonSqsSourceOptions.getRegion()))
+                            
.region(Region.of(amazonSqsSourceConfig.getRegion()))
                             .credentialsProvider(
                                     StaticCredentialsProvider.create(
                                             AwsBasicCredentials.create(
-                                                    
amazonSqsSourceOptions.getAccessKeyId(),
-                                                    
amazonSqsSourceOptions.getSecretAccessKey())))
+                                                    
amazonSqsSourceConfig.getAccessKeyId(),
+                                                    
amazonSqsSourceConfig.getSecretAccessKey())))
                             .build();
         } else {
             sqsClient =
                     SqsClient.builder()
-                            
.endpointOverride(URI.create(amazonSqsSourceOptions.getUrl()))
-                            
.region(Region.of(amazonSqsSourceOptions.getRegion()))
+                            
.endpointOverride(URI.create(amazonSqsSourceConfig.getUrl()))
+                            
.region(Region.of(amazonSqsSourceConfig.getRegion()))
                             
.credentialsProvider(DefaultCredentialsProvider.create())
                             .build();
         }
@@ -96,7 +96,7 @@ public class AmazonSqsSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRo
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         ReceiveMessageRequest receiveMessageRequest =
                 ReceiveMessageRequest.builder()
-                        .queueUrl(amazonSqsSourceOptions.getUrl())
+                        .queueUrl(amazonSqsSourceConfig.getUrl())
                         .maxNumberOfMessages(10) // Adjust the batch size as 
needed
                         .waitTimeSeconds(10) // Adjust the wait time as needed
                         .build();
@@ -110,10 +110,10 @@ public class AmazonSqsSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRo
             output.collect(seaTunnelRow);
 
             // Delete the processed message
-            if (amazonSqsSourceOptions.isDeleteMessage()) {
+            if (amazonSqsSourceConfig.isDeleteMessage()) {
                 DeleteMessageRequest deleteMessageRequest =
                         DeleteMessageRequest.builder()
-                                .queueUrl(amazonSqsSourceOptions.getUrl())
+                                .queueUrl(amazonSqsSourceConfig.getUrl())
                                 .receiptHandle(message.receiptHandle())
                                 .build();
                 sqsClient.deleteMessage(deleteMessageRequest);


Reply via email to