This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a69efca0fd [improve] update amazondynamodb connector (#8601)
a69efca0fd is described below
commit a69efca0fdca28bdb46afde395e0bea7c404c82b
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 14:01:26 2025 +0800
[improve] update amazondynamodb connector (#8601)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
...BConfig.java => AmazonDynamoDBBaseOptions.java} | 28 +-------
.../config/AmazonDynamoDBConfig.java | 84 +++++++++-------------
.../config/AmazonDynamoDBSinkOptions.java | 30 ++++++++
.../config/AmazonDynamoDBSourceOptions.java | 62 ++++------------
.../serialize/DefaultSeaTunnelRowSerializer.java | 11 ++-
.../amazondynamodb/sink/AmazonDynamoDBSink.java | 62 ++++------------
.../sink/AmazonDynamoDBSinkFactory.java | 25 ++++---
.../amazondynamodb/sink/AmazonDynamoDBWriter.java | 10 ++-
.../amazondynamodb/sink/DynamoDbSinkClient.java | 22 +++---
.../source/AmazonDynamoDBSource.java | 67 +++++------------
.../source/AmazonDynamoDBSourceFactory.java | 41 +++++++----
.../source/AmazonDynamoDBSourceReader.java | 18 ++---
.../AmazonDynamoDBSourceSplitEnumerator.java | 16 ++---
14 files changed, 190 insertions(+), 287 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 88f4f69a29..a473f0f685 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -224,7 +224,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("RocketMqSourceOptions");
whiteList.add("TablestoreSinkOptions");
whiteList.add("TableStoreDBSourceOptions");
- whiteList.add("AmazonDynamoDBSinkOptions");
whiteList.add("KuduSinkOptions");
whiteList.add("TDengineSinkOptions");
whiteList.add("Neo4jSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBBaseOptions.java
similarity index 65%
copy from
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
copy to
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBBaseOptions.java
index 87d69c74c1..4507e7c46d 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBBaseOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
import java.io.Serializable;
-public class AmazonDynamoDBConfig implements Serializable {
+public class AmazonDynamoDBBaseOptions implements Serializable {
public static final Option<String> URL =
Options.key("url")
.stringType()
@@ -48,30 +48,4 @@ public class AmazonDynamoDBConfig implements Serializable {
.stringType()
.noDefaultValue()
.withDescription("The table of Amazon DynamoDB");
-
- public static final Option<Integer> BATCH_SIZE =
- Options.key("batch_size")
- .intType()
- .defaultValue(25)
- .withDescription("The batch size of Amazon DynamoDB");
-
- public static final Option<Integer> BATCH_INTERVAL_MS =
- Options.key("batch_interval_ms")
- .intType()
- .defaultValue(1000)
- .withDescription("The batch interval of Amazon DynamoDB");
-
- @SuppressWarnings("checkstyle:MagicNumber")
- public static final Option<Integer> SCAN_ITEM_LIMIT =
- Options.key("scan_item_limit")
- .intType()
- .defaultValue(1)
- .withDescription("number of item each scan request should
return");
-
- @SuppressWarnings("checkstyle:MagicNumber")
- public static final Option<Integer> PARALLEL_SCAN_THREADS =
- Options.key("parallel_scan_threads")
- .intType()
- .defaultValue(2)
- .withDescription("number of logical segments for parallel
scan");
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index 87d69c74c1..0c72819776 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -17,61 +17,47 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
import java.io.Serializable;
+@Data
+@AllArgsConstructor
public class AmazonDynamoDBConfig implements Serializable {
- public static final Option<String> URL =
- Options.key("url")
- .stringType()
- .noDefaultValue()
- .withDescription("url to read to Amazon DynamoDB");
- public static final Option<String> REGION =
- Options.key("region")
- .stringType()
- .noDefaultValue()
- .withDescription("The region of Amazon DynamoDB");
- public static final Option<String> ACCESS_KEY_ID =
- Options.key("access_key_id")
- .stringType()
- .noDefaultValue()
- .withDescription("The access id of Amazon DynamoDB");
- public static final Option<String> SECRET_ACCESS_KEY =
- Options.key("secret_access_key")
- .stringType()
- .noDefaultValue()
- .withDescription("The access secret key of Amazon
DynamoDB");
- public static final Option<String> TABLE =
- Options.key("table")
- .stringType()
- .noDefaultValue()
- .withDescription("The table of Amazon DynamoDB");
- public static final Option<Integer> BATCH_SIZE =
- Options.key("batch_size")
- .intType()
- .defaultValue(25)
- .withDescription("The batch size of Amazon DynamoDB");
+ private String url;
+
+ private String region;
+
+ private String accessKeyId;
+
+ private String secretAccessKey;
+
+ private String table;
- public static final Option<Integer> BATCH_INTERVAL_MS =
- Options.key("batch_interval_ms")
- .intType()
- .defaultValue(1000)
- .withDescription("The batch interval of Amazon DynamoDB");
+ private Config schema;
- @SuppressWarnings("checkstyle:MagicNumber")
- public static final Option<Integer> SCAN_ITEM_LIMIT =
- Options.key("scan_item_limit")
- .intType()
- .defaultValue(1)
- .withDescription("number of item each scan request should
return");
+ public int batchSize;
+ public int scanItemLimit;
+ public int parallelScanThreads;
- @SuppressWarnings("checkstyle:MagicNumber")
- public static final Option<Integer> PARALLEL_SCAN_THREADS =
- Options.key("parallel_scan_threads")
- .intType()
- .defaultValue(2)
- .withDescription("number of logical segments for parallel
scan");
+ public AmazonDynamoDBConfig(ReadonlyConfig config) {
+ this.url = config.get(AmazonDynamoDBBaseOptions.URL);
+ this.region = config.get(AmazonDynamoDBBaseOptions.REGION);
+ this.accessKeyId = config.get(AmazonDynamoDBBaseOptions.ACCESS_KEY_ID);
+ this.secretAccessKey =
config.get(AmazonDynamoDBBaseOptions.SECRET_ACCESS_KEY);
+ this.table = config.get(AmazonDynamoDBBaseOptions.TABLE);
+ if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+ this.schema =
ReadonlyConfig.fromMap(config.get(TableSchemaOptions.SCHEMA)).toConfig();
+ }
+ this.batchSize = config.get(AmazonDynamoDBSinkOptions.BATCH_SIZE);
+ this.scanItemLimit =
config.get(AmazonDynamoDBSourceOptions.SCAN_ITEM_LIMIT);
+ this.parallelScanThreads =
config.get(AmazonDynamoDBSourceOptions.PARALLEL_SCAN_THREADS);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
new file mode 100644
index 0000000000..48aebf8be1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class AmazonDynamoDBSinkOptions extends AmazonDynamoDBBaseOptions {
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(25)
+ .withDescription("The batch size of Amazon DynamoDB");
+}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
index bd4f32b08f..57d44adb0a 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
@@ -17,57 +17,25 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class AmazonDynamoDBSourceOptions implements Serializable {
-
- private String url;
-
- private String region;
-
- private String accessKeyId;
-
- private String secretAccessKey;
+import java.util.Map;
- private String table;
+public class AmazonDynamoDBSourceOptions extends AmazonDynamoDBBaseOptions {
- private Config schema;
+ public static final Option<Integer> SCAN_ITEM_LIMIT =
+ Options.key("scan_item_limit")
+ .intType()
+ .defaultValue(1)
+ .withDescription("number of item each scan request should
return");
- public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue();
- public int batchIntervalMs =
AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue();
- public int scanItemLimit =
AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.defaultValue();
- public int parallelScanThreads =
AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.defaultValue();
+ public static final Option<Integer> PARALLEL_SCAN_THREADS =
+ Options.key("parallel_scan_threads")
+ .intType()
+ .defaultValue(2)
+ .withDescription("number of logical segments for parallel
scan");
- public AmazonDynamoDBSourceOptions(Config config) {
- this.url = config.getString(AmazonDynamoDBConfig.URL.key());
- this.region = config.getString(AmazonDynamoDBConfig.REGION.key());
- this.accessKeyId =
config.getString(AmazonDynamoDBConfig.ACCESS_KEY_ID.key());
- this.secretAccessKey =
config.getString(AmazonDynamoDBConfig.SECRET_ACCESS_KEY.key());
- this.table = config.getString(AmazonDynamoDBConfig.TABLE.key());
- if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
- this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
- }
- if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) {
- this.batchSize =
config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key());
- }
- if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) {
- this.batchIntervalMs =
config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key());
- }
- if (config.hasPath(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key())) {
- this.scanItemLimit =
config.getInt(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key());
- }
- if (config.hasPath(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key())) {
- this.parallelScanThreads =
-
config.getInt(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key());
- }
- }
+ public static final Option<Map<String, Object>> SCHEMA =
TableSchemaOptions.SCHEMA;
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
index 5cef8d2bc6..6d31eecca6 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -40,14 +40,13 @@ import java.util.stream.Stream;
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
private final SeaTunnelRowType seaTunnelRowType;
- private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+ private final AmazonDynamoDBConfig amazondynamodbConfig;
private final List<AttributeValue.Type> measurementsType;
public DefaultSeaTunnelRowSerializer(
- SeaTunnelRowType seaTunnelRowType,
- AmazonDynamoDBSourceOptions amazondynamodbSourceOptions) {
+ SeaTunnelRowType seaTunnelRowType, AmazonDynamoDBConfig
amazondynamodbConfig) {
this.seaTunnelRowType = seaTunnelRowType;
- this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+ this.amazondynamodbConfig = amazondynamodbConfig;
this.measurementsType = convertTypes(seaTunnelRowType);
}
@@ -65,7 +64,7 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
measurementsType.get(index)));
}
return PutItemRequest.builder()
- .tableName(amazondynamodbSourceOptions.getTable())
+ .tableName(amazondynamodbConfig.getTable())
.item(itemValues)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index 68dcc84a42..b9a27a7c29 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -17,79 +17,41 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
-
-@AutoService(SeaTunnelSink.class)
public class AmazonDynamoDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
{
- private SeaTunnelRowType rowType;
+ private CatalogTable catalogTable;
- private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+ private AmazonDynamoDBConfig amazondynamodbConfig;
- @Override
- public String getPluginName() {
- return "AmazonDynamodb";
+ public AmazonDynamoDBSink(
+ CatalogTable catalogTable, AmazonDynamoDBConfig
amazondynamodbConfig) {
+ this.catalogTable = catalogTable;
+ this.amazondynamodbConfig = amazondynamodbConfig;
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- URL.key(),
- TABLE.key(),
- REGION.key(),
- ACCESS_KEY_ID.key(),
- SECRET_ACCESS_KEY.key());
- if (!result.isSuccess()) {
- throw new AmazonDynamoDBConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- amazondynamodbSourceOptions = new
AmazonDynamoDBSourceOptions(pluginConfig);
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.rowType = seaTunnelRowType;
+ public String getPluginName() {
+ return "AmazonDynamodb";
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new AmazonDynamoDBWriter(amazondynamodbSourceOptions, rowType);
- }
-
- @Override
- public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return new AmazonDynamoDBWriter(amazondynamodbConfig,
catalogTable.getSeaTunnelRowType());
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
index 14aaf5fc87..715da7c1b1 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
@@ -18,18 +18,20 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_INTERVAL_MS;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.SECRET_ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.URL;
@AutoService(Factory.class)
public class AmazonDynamoDBSinkFactory implements TableSinkFactory {
@@ -42,7 +44,14 @@ public class AmazonDynamoDBSinkFactory implements
TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
- .optional(BATCH_SIZE, BATCH_INTERVAL_MS)
+ .optional(BATCH_SIZE)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () ->
+ new AmazonDynamoDBSink(
+ context.getCatalogTable(), new
AmazonDynamoDBConfig(context.getOptions()));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
index aa27a4b714..f7e39b11a2 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
@@ -33,11 +33,9 @@ public class AmazonDynamoDBWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private final SeaTunnelRowSerializer serializer;
public AmazonDynamoDBWriter(
- AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
- SeaTunnelRowType seaTunnelRowType) {
- dynamoDbSinkClient = new
DynamoDbSinkClient(amazondynamodbSourceOptions);
- serializer =
- new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
amazondynamodbSourceOptions);
+ AmazonDynamoDBConfig amazondynamodbConfig, SeaTunnelRowType
seaTunnelRowType) {
+ dynamoDbSinkClient = new DynamoDbSinkClient(amazondynamodbConfig);
+ serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
amazondynamodbConfig);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
index b12ba15d9d..29ac8b7d4f 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -35,13 +35,13 @@ import java.util.List;
import java.util.Map;
public class DynamoDbSinkClient {
- private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+ private final AmazonDynamoDBConfig amazondynamodbConfig;
private volatile boolean initialize;
private DynamoDbClient dynamoDbClient;
private final List<WriteRequest> batchList;
- public DynamoDbSinkClient(AmazonDynamoDBSourceOptions
amazondynamodbSourceOptions) {
- this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+ public DynamoDbSinkClient(AmazonDynamoDBConfig amazondynamodbConfig) {
+ this.amazondynamodbConfig = amazondynamodbConfig;
this.batchList = new ArrayList<>();
}
@@ -51,15 +51,15 @@ public class DynamoDbSinkClient {
}
dynamoDbClient =
DynamoDbClient.builder()
-
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+
.endpointOverride(URI.create(amazondynamodbConfig.getUrl()))
// The region is meaningless for local DynamoDb but
required for client
// builder validation
-
.region(Region.of(amazondynamodbSourceOptions.getRegion()))
+ .region(Region.of(amazondynamodbConfig.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
-
amazondynamodbSourceOptions.getAccessKeyId(),
-
amazondynamodbSourceOptions.getSecretAccessKey())))
+
amazondynamodbConfig.getAccessKeyId(),
+
amazondynamodbConfig.getSecretAccessKey())))
.build();
initialize = true;
}
@@ -70,8 +70,8 @@ public class DynamoDbSinkClient {
WriteRequest.builder()
.putRequest(PutRequest.builder().item(putItemRequest.item()).build())
.build());
- if (amazondynamodbSourceOptions.getBatchSize() > 0
- && batchList.size() >=
amazondynamodbSourceOptions.getBatchSize()) {
+ if (amazondynamodbConfig.getBatchSize() > 0
+ && batchList.size() >= amazondynamodbConfig.getBatchSize()) {
flush();
}
}
@@ -88,7 +88,7 @@ public class DynamoDbSinkClient {
return;
}
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
- requestItems.put(amazondynamodbSourceOptions.getTable(), batchList);
+ requestItems.put(amazondynamodbConfig.getTable(), batchList);
dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
index e0175b4da7..12f7f8435d 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
@@ -17,83 +17,50 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
-import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+import java.util.Collections;
+import java.util.List;
@Slf4j
-@AutoService(SeaTunnelSource.class)
public class AmazonDynamoDBSource
implements SeaTunnelSource<
SeaTunnelRow, AmazonDynamoDBSourceSplit,
AmazonDynamoDBSourceState>,
SupportParallelism,
SupportColumnProjection {
- private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+ private AmazonDynamoDBConfig amazondynamodbConfig;
+ private CatalogTable catalogTable;
- private SeaTunnelRowType typeInfo;
+ public AmazonDynamoDBSource(
+ AmazonDynamoDBConfig amazondynamodbConfig, CatalogTable
catalogTable) {
+ this.amazondynamodbConfig = amazondynamodbConfig;
+ this.catalogTable = catalogTable;
+ }
@Override
public String getPluginName() {
return "AmazonDynamodb";
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- URL.key(),
- TABLE.key(),
- REGION.key(),
- ACCESS_KEY_ID.key(),
- SECRET_ACCESS_KEY.key(),
- TableSchemaOptions.SCHEMA.key());
- if (!result.isSuccess()) {
- throw new AmazonDynamoDBConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- amazondynamodbSourceOptions = new
AmazonDynamoDBSourceOptions(pluginConfig);
- typeInfo =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- }
-
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return this.typeInfo;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
@@ -101,8 +68,7 @@ public class AmazonDynamoDBSource
createEnumerator(
SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit>
enumeratorContext)
throws Exception {
- return new AmazonDynamoDBSourceSplitEnumerator(
- enumeratorContext, amazondynamodbSourceOptions);
+ return new AmazonDynamoDBSourceSplitEnumerator(enumeratorContext,
amazondynamodbConfig);
}
@Override
@@ -112,12 +78,13 @@ public class AmazonDynamoDBSource
AmazonDynamoDBSourceState checkpointState)
throws Exception {
return new AmazonDynamoDBSourceSplitEnumerator(
- enumeratorContext, amazondynamodbSourceOptions,
checkpointState);
+ enumeratorContext, amazondynamodbConfig, checkpointState);
}
@Override
public SourceReader<SeaTunnelRow, AmazonDynamoDBSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- return new AmazonDynamoDBSourceReader(readerContext,
amazondynamodbSourceOptions, typeInfo);
+ return new AmazonDynamoDBSourceReader(
+ readerContext, amazondynamodbConfig,
catalogTable.getSeaTunnelRowType());
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
index 047d78c0d6..3218e5a29a 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
@@ -19,19 +19,26 @@ package
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SCAN_ITEM_LIMIT;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+import java.io.Serializable;
+
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.PARALLEL_SCAN_THREADS;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.SCAN_ITEM_LIMIT;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.SECRET_ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.URL;
@AutoService(Factory.class)
public class AmazonDynamoDBSourceFactory implements TableSourceFactory {
@@ -43,17 +50,21 @@ public class AmazonDynamoDBSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(
- URL,
- REGION,
- ACCESS_KEY_ID,
- SECRET_ACCESS_KEY,
- TABLE,
- TableSchemaOptions.SCHEMA)
+ .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY,
TABLE, SCHEMA)
.optional(SCAN_ITEM_LIMIT, PARALLEL_SCAN_THREADS)
.build();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new AmazonDynamoDBSource(
+ new AmazonDynamoDBConfig(context.getOptions()),
+
CatalogTableUtil.buildWithConfig(context.getOptions()));
+ }
+
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return AmazonDynamoDBSource.class;
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index 3f1962a3eb..a9e72d9977 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
@@ -46,7 +46,7 @@ public class AmazonDynamoDBSourceReader
protected DynamoDbClient dynamoDbClient;
protected SourceReader.Context context;
- protected AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+ protected AmazonDynamoDBConfig amazondynamodbConfig;
protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
Queue<AmazonDynamoDBSourceSplit> pendingSplits = new
ConcurrentLinkedDeque<>();
@@ -54,10 +54,10 @@ public class AmazonDynamoDBSourceReader
public AmazonDynamoDBSourceReader(
SourceReader.Context context,
- AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
+ AmazonDynamoDBConfig amazondynamodbConfig,
SeaTunnelRowType typeInfo) {
this.context = context;
- this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+ this.amazondynamodbConfig = amazondynamodbConfig;
this.seaTunnelRowDeserializer = new
DefaultSeaTunnelRowDeserializer(typeInfo);
}
@@ -65,15 +65,15 @@ public class AmazonDynamoDBSourceReader
public void open() {
dynamoDbClient =
DynamoDbClient.builder()
-
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+
.endpointOverride(URI.create(amazondynamodbConfig.getUrl()))
// The region is meaningless for local DynamoDb but
required for client
// builder validation
-
.region(Region.of(amazondynamodbSourceOptions.getRegion()))
+ .region(Region.of(amazondynamodbConfig.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
-
amazondynamodbSourceOptions.getAccessKeyId(),
-
amazondynamodbSourceOptions.getSecretAccessKey())))
+
amazondynamodbConfig.getAccessKeyId(),
+
amazondynamodbConfig.getSecretAccessKey())))
.build();
}
@@ -124,7 +124,7 @@ public class AmazonDynamoDBSourceReader
ScanIterable scan;
ScanRequest scanRequest =
ScanRequest.builder()
- .tableName(amazondynamodbSourceOptions.getTable())
+ .tableName(amazondynamodbConfig.getTable())
.limit(split.getItemCount())
.segment(split.getSplitId())
.totalSegments(split.getTotalSegments())
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
index 03c7e9d5d2..50099f4b54 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,23 +41,23 @@ public class AmazonDynamoDBSourceSplitEnumerator
private final SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit>
enumeratorContext;
private final Map<Integer, List<AmazonDynamoDBSourceSplit>> pendingSplits;
- private final AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions;
+ private final AmazonDynamoDBConfig amazonDynamoDBConfig;
private final Object stateLock = new Object();
private volatile boolean shouldEnumerate;
public AmazonDynamoDBSourceSplitEnumerator(
Context<AmazonDynamoDBSourceSplit> enumeratorContext,
- AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions) {
- this(enumeratorContext, amazonDynamoDBSourceOptions, null);
+ AmazonDynamoDBConfig amazonDynamoDBConfig) {
+ this(enumeratorContext, amazonDynamoDBConfig, null);
}
public AmazonDynamoDBSourceSplitEnumerator(
Context<AmazonDynamoDBSourceSplit> enumeratorContext,
- AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions,
+ AmazonDynamoDBConfig amazonDynamoDBConfig,
AmazonDynamoDBSourceState sourceState) {
this.enumeratorContext = enumeratorContext;
- this.amazonDynamoDBSourceOptions = amazonDynamoDBSourceOptions;
+ this.amazonDynamoDBConfig = amazonDynamoDBConfig;
this.pendingSplits = new HashMap<>();
this.shouldEnumerate = sourceState == null;
if (sourceState != null) {
@@ -119,8 +119,8 @@ public class AmazonDynamoDBSourceSplitEnumerator
private Set<AmazonDynamoDBSourceSplit> discoverySplits() {
Set<AmazonDynamoDBSourceSplit> allSplit = new HashSet<>();
- int totalSegments = amazonDynamoDBSourceOptions.parallelScanThreads;
- int itemLimit = amazonDynamoDBSourceOptions.scanItemLimit;
+ int totalSegments = amazonDynamoDBConfig.parallelScanThreads;
+ int itemLimit = amazonDynamoDBConfig.scanItemLimit;
for (int i = 0; i < totalSegments; i++) {
AmazonDynamoDBSourceSplit split =
new AmazonDynamoDBSourceSplit(i, totalSegments, itemLimit);