This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c747e02a98 [improve] amazon sqs connector update (#8602)
c747e02a98 is described below
commit c747e02a980e3eff58b75c466c1bcfe2a195e226
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 14:00:52 2025 +0800
[improve] amazon sqs connector update (#8602)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
...zonSqsConfig.java => AmazonSqsBaseOptions.java} | 18 +--
.../amazonsqs/config/AmazonSqsSinkOptions.java | 20 ++++
...urceOptions.java => AmazonSqsSourceConfig.java} | 31 ++----
.../amazonsqs/config/AmazonSqsSourceOptions.java | 60 ++++------
.../amazonsqs/sink/AmazonSqsSinkFactory.java | 13 ++-
.../amazonsqs/sink/AmazonSqsSinkWriter.java | 14 +--
.../amazonsqs/source/AmazonSqsSource.java | 122 ++++-----------------
.../amazonsqs/source/AmazonSqsSourceFactory.java | 101 ++++++++++++++++-
.../amazonsqs/source/AmazonSqsSourceReader.java | 30 ++---
10 files changed, 200 insertions(+), 210 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 2a814b08ef..88f4f69a29 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -194,7 +194,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("EasysearchSourceOptions");
whiteList.add("RabbitmqSinkOptions");
whiteList.add("StarRocksSourceOptions");
- whiteList.add("AmazonSqsSinkOptions");
whiteList.add("IcebergSourceOptions");
whiteList.add("HbaseSourceOptions");
whiteList.add("PaimonSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsConfig.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsBaseOptions.java
similarity index 76%
rename from
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsConfig.java
rename to
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsBaseOptions.java
index 505756908c..3b9e23771c 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsConfig.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsBaseOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
import java.io.Serializable;
-public class AmazonSqsConfig implements Serializable {
+public class AmazonSqsBaseOptions implements Serializable {
public static final String DEFAULT_FIELD_DELIMITER = ",";
@@ -47,11 +47,6 @@ public class AmazonSqsConfig implements Serializable {
.noDefaultValue()
.withDescription("The access secret key of Amazon SQS
Service");
- public static final Option<String> MESSAGE_GROUP_ID =
- Options.key("message_group_id")
- .stringType()
- .noDefaultValue()
- .withDescription("The message group id of Amazon SQS
Service");
public static final Option<MessageFormat> FORMAT =
Options.key("format")
.enumType(MessageFormat.class)
@@ -64,15 +59,4 @@ public class AmazonSqsConfig implements Serializable {
.stringType()
.noDefaultValue()
.withDescription("Customize the field delimiter for data
format.");
- public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
- Options.key("debezium_record_include_schema")
- .booleanType()
- .defaultValue(true)
- .withDescription("Does the debezium record carry a
schema.");
-
- public static final Option<Boolean> DELETE_MESSAGE =
- Options.key("delete_message")
- .booleanType()
- .defaultValue(false)
- .withDescription("Delete the message after it is consumed
if set true.");
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSinkOptions.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSinkOptions.java
new file mode 100644
index 0000000000..13502ca1f8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSinkOptions.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
+
+public class AmazonSqsSinkOptions extends AmazonSqsBaseOptions {}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceConfig.java
similarity index 52%
copy from
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
copy to
seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceConfig.java
index acb4d6831c..58113a0d91 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceConfig.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -28,7 +28,7 @@ import java.io.Serializable;
@Data
@AllArgsConstructor
-public class AmazonSqsSourceOptions implements Serializable {
+public class AmazonSqsSourceConfig implements Serializable {
private String url;
@@ -44,23 +44,14 @@ public class AmazonSqsSourceOptions implements Serializable
{
private Config schema;
- public AmazonSqsSourceOptions(Config config) {
- this.url = config.getString(AmazonSqsConfig.URL.key());
- this.region = config.getString(AmazonSqsConfig.REGION.key());
- if (config.hasPath(AmazonSqsConfig.ACCESS_KEY_ID.key())) {
- this.accessKeyId =
config.getString(AmazonSqsConfig.ACCESS_KEY_ID.key());
- }
- if (config.hasPath(AmazonSqsConfig.SECRET_ACCESS_KEY.key())) {
- this.secretAccessKey =
config.getString(AmazonSqsConfig.SECRET_ACCESS_KEY.key());
- }
- if (config.hasPath(AmazonSqsConfig.MESSAGE_GROUP_ID.key())) {
- this.messageGroupId =
config.getString(AmazonSqsConfig.MESSAGE_GROUP_ID.key());
- }
- if (config.hasPath(AmazonSqsConfig.DELETE_MESSAGE.key())) {
- this.deleteMessage =
config.getBoolean(AmazonSqsConfig.DELETE_MESSAGE.key());
- }
- if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
- this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
- }
+ public AmazonSqsSourceConfig(ReadonlyConfig config) {
+ this.url = config.get(AmazonSqsSourceOptions.URL);
+ this.region = config.get(AmazonSqsSourceOptions.REGION);
+ this.accessKeyId = config.get(AmazonSqsSourceOptions.ACCESS_KEY_ID);
+ this.secretAccessKey =
config.get(AmazonSqsSourceOptions.SECRET_ACCESS_KEY);
+ this.messageGroupId =
config.get(AmazonSqsSourceOptions.MESSAGE_GROUP_ID);
+ this.deleteMessage = config.get(AmazonSqsSourceOptions.DELETE_MESSAGE);
+ this.schema =
ReadonlyConfig.fromMap(config.get(AmazonSqsSourceOptions.SCHEMA)).toConfig();
+ ;
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
index acb4d6831c..07ee216fe3 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java
@@ -17,50 +17,30 @@
package org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class AmazonSqsSourceOptions implements Serializable {
-
- private String url;
-
- private String region;
-
- private String accessKeyId;
-
- private String secretAccessKey;
+import java.util.Map;
- private String messageGroupId;
+public class AmazonSqsSourceOptions extends AmazonSqsBaseOptions {
+ public static final Option<Map<String, Object>> SCHEMA =
TableSchemaOptions.SCHEMA;
- private boolean deleteMessage;
+ public static final Option<Boolean> DELETE_MESSAGE =
+ Options.key("delete_message")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Delete the message after it is consumed
if set true.");
- private Config schema;
+ public static final Option<String> MESSAGE_GROUP_ID =
+ Options.key("message_group_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The message group id of Amazon SQS
Service");
- public AmazonSqsSourceOptions(Config config) {
- this.url = config.getString(AmazonSqsConfig.URL.key());
- this.region = config.getString(AmazonSqsConfig.REGION.key());
- if (config.hasPath(AmazonSqsConfig.ACCESS_KEY_ID.key())) {
- this.accessKeyId =
config.getString(AmazonSqsConfig.ACCESS_KEY_ID.key());
- }
- if (config.hasPath(AmazonSqsConfig.SECRET_ACCESS_KEY.key())) {
- this.secretAccessKey =
config.getString(AmazonSqsConfig.SECRET_ACCESS_KEY.key());
- }
- if (config.hasPath(AmazonSqsConfig.MESSAGE_GROUP_ID.key())) {
- this.messageGroupId =
config.getString(AmazonSqsConfig.MESSAGE_GROUP_ID.key());
- }
- if (config.hasPath(AmazonSqsConfig.DELETE_MESSAGE.key())) {
- this.deleteMessage =
config.getBoolean(AmazonSqsConfig.DELETE_MESSAGE.key());
- }
- if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
- this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
- }
- }
+ public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
+ Options.key("debezium_record_include_schema")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Does the debezium record carry a
schema.");
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
index 030e9d221a..50ea762740 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
@@ -27,8 +27,12 @@ import
org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.SECRET_ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.URL;
@AutoService(Factory.class)
public class AmazonSqsSinkFactory implements TableSinkFactory {
@@ -46,6 +50,9 @@ public class AmazonSqsSinkFactory implements TableSinkFactory
{
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(URL, REGION).build();
+ return OptionRule.builder()
+ .required(URL, REGION)
+ .optional(ACCESS_KEY_ID, SECRET_ACCESS_KEY, FORMAT,
FIELD_DELIMITER)
+ .build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
index eb95c95b1d..c7ba34fdc1 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java
@@ -41,13 +41,13 @@ import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.ACCESS_KEY_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.DEFAULT_FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.SECRET_ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.DEFAULT_FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.SECRET_ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.URL;
public class AmazonSqsSinkWriter extends AbstractSinkWriter<SeaTunnelRow,
Void> {
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
index c4c3ad372d..ee0d078ab1 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.java
@@ -17,79 +17,41 @@
package org.apache.seatunnel.connectors.seatunnel.amazonsqs.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions;
-import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.MessageFormat;
-import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.exception.AmazonSqsConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
-import
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
-import org.apache.seatunnel.format.text.TextDeserializationSchema;
-import org.apache.seatunnel.format.text.constant.TextFormatConstant;
-import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.DEFAULT_FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import java.util.Collections;
+import java.util.List;
@Slf4j
-@AutoService(SeaTunnelSource.class)
public class AmazonSqsSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
- private AmazonSqsSourceOptions amazonSqsSourceOptions;
+ private AmazonSqsSourceConfig amazonSqsSourceConfig;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
- private SeaTunnelRowType typeInfo;
private CatalogTable catalogTable;
- @Override
- public String getPluginName() {
- return "AmazonSqs";
+ public AmazonSqsSource(
+ AmazonSqsSourceConfig amazonSqsSourceConfig,
+ CatalogTable catalogTable,
+ DeserializationSchema<SeaTunnelRow> deserializationSchema) {
+ this.amazonSqsSourceConfig = amazonSqsSourceConfig;
+ this.catalogTable = catalogTable;
+ this.deserializationSchema = deserializationSchema;
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig, URL.key(), REGION.key(),
TableSchemaOptions.SCHEMA.key());
- if (!result.isSuccess()) {
- throw new AmazonSqsConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.amazonSqsSourceOptions = new AmazonSqsSourceOptions(pluginConfig);
- this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
- this.typeInfo = catalogTable.getSeaTunnelRowType();
- setDeserialization(pluginConfig);
+ public String getPluginName() {
+ return "AmazonSqs";
}
@Override
@@ -98,63 +60,17 @@ public class AmazonSqsSource extends
AbstractSingleSplitSource<SeaTunnelRow>
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return this.typeInfo;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
SingleSplitReaderContext readerContext) throws Exception {
return new AmazonSqsSourceReader(
- readerContext, amazonSqsSourceOptions, deserializationSchema,
typeInfo);
- }
-
- private void setDeserialization(Config config) {
- if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
- MessageFormat format =
ReadonlyConfig.fromConfig(config).get(FORMAT);
- switch (format) {
- case JSON:
- deserializationSchema =
- new JsonDeserializationSchema(catalogTable, false,
false);
- break;
- case TEXT:
- String delimiter = DEFAULT_FIELD_DELIMITER;
- if (config.hasPath(FIELD_DELIMITER.key())) {
- delimiter = config.getString(FIELD_DELIMITER.key());
- }
- deserializationSchema =
- TextDeserializationSchema.builder()
- .seaTunnelRowType(typeInfo)
- .delimiter(delimiter)
- .build();
- break;
- case CANAL_JSON:
- deserializationSchema =
-
CanalJsonDeserializationSchema.builder(catalogTable)
- .setIgnoreParseErrors(true)
- .build();
- break;
- case DEBEZIUM_JSON:
- boolean includeSchema =
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
- if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
- includeSchema =
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
- }
- deserializationSchema =
- new DebeziumJsonDeserializationSchema(
- catalogTable, true, includeSchema);
- break;
- default:
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unsupported format: " + format);
- }
- } else {
- typeInfo = CatalogTableUtil.buildSimpleTextSchema();
- this.deserializationSchema =
- TextDeserializationSchema.builder()
- .seaTunnelRowType(typeInfo)
- .delimiter(TextFormatConstant.PLACEHOLDER)
- .build();
- }
+ readerContext,
+ amazonSqsSourceConfig,
+ deserializationSchema,
+ catalogTable.getSeaTunnelRowType());
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
index 4f05b0b8f6..08d631ee11 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceFactory.java
@@ -17,16 +17,44 @@
package org.apache.seatunnel.connectors.seatunnel.amazonsqs.source;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.MessageFormat;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
+import java.io.Serializable;
+
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.DEFAULT_FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.DELETE_MESSAGE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.MESSAGE_GROUP_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.SECRET_ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions.URL;
@AutoService(Factory.class)
public class AmazonSqsSourceFactory implements TableSourceFactory {
@@ -37,11 +65,76 @@ public class AmazonSqsSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(URL, REGION,
TableSchemaOptions.SCHEMA).build();
+ return OptionRule.builder()
+ .required(URL, REGION, SCHEMA)
+ .optional(
+ ACCESS_KEY_ID,
+ SECRET_ACCESS_KEY,
+ MESSAGE_GROUP_ID,
+ DELETE_MESSAGE,
+ FORMAT,
+ FIELD_DELIMITER,
+ DEBEZIUM_RECORD_INCLUDE_SCHEMA)
+ .build();
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(context.getOptions());
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+ setDeserialization(context.getOptions().toConfig(),
catalogTable);
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new AmazonSqsSource(
+ new
AmazonSqsSourceConfig(context.getOptions()),
+ catalogTable,
+ deserializationSchema);
}
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return AmazonSqsSource.class;
}
+
+ private DeserializationSchema<SeaTunnelRow> setDeserialization(
+ Config config, CatalogTable catalogTable) {
+ DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ MessageFormat format = ReadonlyConfig.fromConfig(config).get(FORMAT);
+ switch (format) {
+ case JSON:
+ deserializationSchema = new
JsonDeserializationSchema(catalogTable, false, false);
+ break;
+ case TEXT:
+ String delimiter = DEFAULT_FIELD_DELIMITER;
+ if (config.hasPath(FIELD_DELIMITER.key())) {
+ delimiter = config.getString(FIELD_DELIMITER.key());
+ }
+ deserializationSchema =
+ TextDeserializationSchema.builder()
+
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+ .delimiter(delimiter)
+ .build();
+ break;
+ case CANAL_JSON:
+ deserializationSchema =
+ CanalJsonDeserializationSchema.builder(catalogTable)
+ .setIgnoreParseErrors(true)
+ .build();
+ break;
+ case DEBEZIUM_JSON:
+ boolean includeSchema =
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
+ if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
+ includeSchema =
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
+ }
+ deserializationSchema =
+ new DebeziumJsonDeserializationSchema(catalogTable,
true, includeSchema);
+ break;
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unsupported format: " + format);
+ }
+ return deserializationSchema;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
index bc28cf7b8e..309b328ee1 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.java
@@ -21,7 +21,7 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.deserialize.AmazonSqsDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.amazonsqs.deserialize.SeaTunnelRowDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
@@ -47,40 +47,40 @@ public class AmazonSqsSourceReader extends
AbstractSingleSplitReader<SeaTunnelRo
protected SqsClient sqsClient;
protected SingleSplitReaderContext context;
- protected AmazonSqsSourceOptions amazonSqsSourceOptions;
+ protected AmazonSqsSourceConfig amazonSqsSourceConfig;
private final SeaTunnelRowDeserializer seaTunnelRowDeserializer;
public AmazonSqsSourceReader(
SingleSplitReaderContext context,
- AmazonSqsSourceOptions amazonSqsSourceOptions,
+ AmazonSqsSourceConfig amazonSqsSourceConfig,
DeserializationSchema<SeaTunnelRow> deserializationSchema,
SeaTunnelRowType seaTunnelRowType) {
this.context = context;
- this.amazonSqsSourceOptions = amazonSqsSourceOptions;
+ this.amazonSqsSourceConfig = amazonSqsSourceConfig;
this.seaTunnelRowDeserializer = new
AmazonSqsDeserializer(deserializationSchema);
}
@Override
public void open() throws Exception {
- if (amazonSqsSourceOptions.getAccessKeyId() != null
- & amazonSqsSourceOptions.getSecretAccessKey() != null) {
+ if (amazonSqsSourceConfig.getAccessKeyId() != null
+ & amazonSqsSourceConfig.getSecretAccessKey() != null) {
sqsClient =
SqsClient.builder()
-
.endpointOverride(URI.create(amazonSqsSourceOptions.getUrl()))
+
.endpointOverride(URI.create(amazonSqsSourceConfig.getUrl()))
// The region is meaningless for local Sqs but
required for client
// builder validation
-
.region(Region.of(amazonSqsSourceOptions.getRegion()))
+
.region(Region.of(amazonSqsSourceConfig.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
-
amazonSqsSourceOptions.getAccessKeyId(),
-
amazonSqsSourceOptions.getSecretAccessKey())))
+
amazonSqsSourceConfig.getAccessKeyId(),
+
amazonSqsSourceConfig.getSecretAccessKey())))
.build();
} else {
sqsClient =
SqsClient.builder()
-
.endpointOverride(URI.create(amazonSqsSourceOptions.getUrl()))
-
.region(Region.of(amazonSqsSourceOptions.getRegion()))
+
.endpointOverride(URI.create(amazonSqsSourceConfig.getUrl()))
+
.region(Region.of(amazonSqsSourceConfig.getRegion()))
.credentialsProvider(DefaultCredentialsProvider.create())
.build();
}
@@ -96,7 +96,7 @@ public class AmazonSqsSourceReader extends
AbstractSingleSplitReader<SeaTunnelRo
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.builder()
- .queueUrl(amazonSqsSourceOptions.getUrl())
+ .queueUrl(amazonSqsSourceConfig.getUrl())
.maxNumberOfMessages(10) // Adjust the batch size as
needed
.waitTimeSeconds(10) // Adjust the wait time as needed
.build();
@@ -110,10 +110,10 @@ public class AmazonSqsSourceReader extends
AbstractSingleSplitReader<SeaTunnelRo
output.collect(seaTunnelRow);
// Delete the processed message
- if (amazonSqsSourceOptions.isDeleteMessage()) {
+ if (amazonSqsSourceConfig.isDeleteMessage()) {
DeleteMessageRequest deleteMessageRequest =
DeleteMessageRequest.builder()
- .queueUrl(amazonSqsSourceOptions.getUrl())
+ .queueUrl(amazonSqsSourceConfig.getUrl())
.receiptHandle(message.receiptHandle())
.build();
sqsClient.deleteMessage(deleteMessageRequest);