This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 740c14422d [connector-v2] add amazondynamicdb source split (#5275)
740c14422d is described below
commit 740c14422d2d58f4618438691c59d4ea97cbf8be
Author: john <[email protected]>
AuthorDate: Wed Oct 25 16:59:34 2023 +0800
[connector-v2] add amazondynamicdb source split (#5275)
Co-authored-by: hailin0 <[email protected]>
---
docs/en/connector-v2/source/AmazonDynamoDB.md | 31 ++--
.../config/AmazonDynamoDBConfig.java | 14 ++
.../config/AmazonDynamoDBSourceOptions.java | 13 ++
.../source/AmazonDynamoDBSource.java | 36 ++++-
.../source/AmazonDynamoDBSourceFactory.java | 3 +
.../source/AmazonDynamoDBSourceReader.java | 93 ++++++++---
.../source/AmazonDynamoDBSourceSplit.java | 39 +++++
.../AmazonDynamoDBSourceSplitEnumerator.java | 172 +++++++++++++++++++++
.../source/AmazonDynamoDBSourceState.java | 34 ++++
.../resources/amazondynamodbIT_source_to_sink.conf | 5 +-
10 files changed, 398 insertions(+), 42 deletions(-)
diff --git a/docs/en/connector-v2/source/AmazonDynamoDB.md
b/docs/en/connector-v2/source/AmazonDynamoDB.md
index ef5eee90e9..3261046b73 100644
--- a/docs/en/connector-v2/source/AmazonDynamoDB.md
+++ b/docs/en/connector-v2/source/AmazonDynamoDB.md
@@ -12,20 +12,22 @@ Read data from Amazon DynamoDB.
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
-- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
## Options
-| name | type | required | default value |
-|-------------------|--------|----------|---------------|
-| url | string | yes | - |
-| region | string | yes | - |
-| access_key_id | string | yes | - |
-| secret_access_key | string | yes | - |
-| table | string | yes | - |
-| schema | config | yes | - |
-| common-options | | yes | - |
+| name | type | required | default value |
+|-----------------------|--------|----------|---------------|
+| url | string | yes | - |
+| region | string | yes | - |
+| access_key_id | string | yes | - |
+| secret_access_key | string | yes | - |
+| table | string | yes | - |
+| schema | config | yes | - |
+| common-options | | yes | - |
+| scan_item_limit | | false | - |
+| parallel_scan_threads | | false | - |
### url [string]
@@ -69,6 +71,14 @@ schema {
Source Plugin common parameters, refer to [Source Plugin](common-options.md)
for details
+### scan_item_limit
+
+number of item each scan request should return
+
+### parallel_scan_threads
+
+number of logical segments for parallel scan
+
## Example
```bash
@@ -106,4 +116,5 @@ Amazondynamodb {
### next version
- Add Amazon DynamoDB Source Connector
+- Add source split to Amazondynamodb Connectors
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index 5194e50f7c..87d69c74c1 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -60,4 +60,18 @@ public class AmazonDynamoDBConfig implements Serializable {
.intType()
.defaultValue(1000)
.withDescription("The batch interval of Amazon DynamoDB");
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static final Option<Integer> SCAN_ITEM_LIMIT =
+ Options.key("scan_item_limit")
+ .intType()
+ .defaultValue(1)
+ .withDescription("number of item each scan request should
return");
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static final Option<Integer> PARALLEL_SCAN_THREADS =
+ Options.key("parallel_scan_threads")
+ .intType()
+ .defaultValue(2)
+ .withDescription("number of logical segments for parallel
scan");
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
index d5183c952e..bd4f32b08f 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
@@ -43,6 +43,9 @@ public class AmazonDynamoDBSourceOptions implements
Serializable {
private Config schema;
public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue();
+ public int batchIntervalMs =
AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue();
+ public int scanItemLimit =
AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.defaultValue();
+ public int parallelScanThreads =
AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.defaultValue();
public AmazonDynamoDBSourceOptions(Config config) {
this.url = config.getString(AmazonDynamoDBConfig.URL.key());
@@ -56,5 +59,15 @@ public class AmazonDynamoDBSourceOptions implements
Serializable {
if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) {
this.batchSize =
config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key());
}
+ if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) {
+ this.batchIntervalMs =
config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key());
+ }
+ if (config.hasPath(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key())) {
+ this.scanItemLimit =
config.getInt(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key());
+ }
+ if (config.hasPath(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key())) {
+ this.parallelScanThreads =
+
config.getInt(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key());
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
index da84c4099b..e0175b4da7 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
@@ -23,7 +23,10 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,9 +37,6 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
-import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
@@ -49,8 +49,11 @@ import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.Am
@Slf4j
@AutoService(SeaTunnelSource.class)
-public class AmazonDynamoDBSource extends
AbstractSingleSplitSource<SeaTunnelRow>
- implements SupportColumnProjection {
+public class AmazonDynamoDBSource
+ implements SeaTunnelSource<
+ SeaTunnelRow, AmazonDynamoDBSourceSplit,
AmazonDynamoDBSourceState>,
+ SupportParallelism,
+ SupportColumnProjection {
private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
@@ -94,8 +97,27 @@ public class AmazonDynamoDBSource extends
AbstractSingleSplitSource<SeaTunnelRow
}
@Override
- public AbstractSingleSplitReader<SeaTunnelRow> createReader(
- SingleSplitReaderContext readerContext) throws Exception {
+ public SourceSplitEnumerator<AmazonDynamoDBSourceSplit,
AmazonDynamoDBSourceState>
+ createEnumerator(
+ SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit>
enumeratorContext)
+ throws Exception {
+ return new AmazonDynamoDBSourceSplitEnumerator(
+ enumeratorContext, amazondynamodbSourceOptions);
+ }
+
+ @Override
+ public SourceSplitEnumerator<AmazonDynamoDBSourceSplit,
AmazonDynamoDBSourceState>
+ restoreEnumerator(
+ SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit>
enumeratorContext,
+ AmazonDynamoDBSourceState checkpointState)
+ throws Exception {
+ return new AmazonDynamoDBSourceSplitEnumerator(
+ enumeratorContext, amazondynamodbSourceOptions,
checkpointState);
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, AmazonDynamoDBSourceSplit> createReader(
+ SourceReader.Context readerContext) throws Exception {
return new AmazonDynamoDBSourceReader(readerContext,
amazondynamodbSourceOptions, typeInfo);
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
index 994f862908..047d78c0d6 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
@@ -26,7 +26,9 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactory;
import com.google.auto.service.AutoService;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SCAN_ITEM_LIMIT;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
@@ -48,6 +50,7 @@ public class AmazonDynamoDBSourceFactory implements
TableSourceFactory {
SECRET_ACCESS_KEY,
TABLE,
TableSchemaOptions.SCHEMA)
+ .optional(SCAN_ITEM_LIMIT, PARALLEL_SCAN_THREADS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index c25f8b0e0b..7c84614e39 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -18,13 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
-import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -33,22 +32,30 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
-import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
@Slf4j
-public class AmazonDynamoDBSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
+public class AmazonDynamoDBSourceReader
+ implements SourceReader<SeaTunnelRow, AmazonDynamoDBSourceSplit> {
protected DynamoDbClient dynamoDbClient;
- protected SingleSplitReaderContext context;
+ protected SourceReader.Context context;
protected AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
+ Queue<AmazonDynamoDBSourceSplit> pendingSplits = new
ConcurrentLinkedDeque<>();
+
+ private volatile boolean noMoreSplit;
public AmazonDynamoDBSourceReader(
- SingleSplitReaderContext context,
+ SourceReader.Context context,
AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
SeaTunnelRowType typeInfo) {
this.context = context;
@@ -80,25 +87,63 @@ public class AmazonDynamoDBSourceReader extends
AbstractSingleSplitReader<SeaTun
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- Map<String, AttributeValue> lastKeyEvaluated = null;
+ while (!pendingSplits.isEmpty()) {
+ synchronized (output.getCheckpointLock()) {
+ AmazonDynamoDBSourceSplit split = pendingSplits.poll();
- ScanResponse scan;
- do {
- scan =
- dynamoDbClient.scan(
- ScanRequest.builder()
-
.tableName(amazondynamodbSourceOptions.getTable())
- .exclusiveStartKey(lastKeyEvaluated)
- .build());
- if (scan.hasItems()) {
- scan.items()
- .forEach(
- item -> {
-
output.collect(seaTunnelRowDeserializer.deserialize(item));
- });
+ read(split, output);
}
- lastKeyEvaluated = scan.lastEvaluatedKey();
- } while (lastKeyEvaluated != null && !lastKeyEvaluated.isEmpty());
- context.signalNoMoreElement();
+ }
+ if (pendingSplits.isEmpty() && noMoreSplit) {
+ context.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId)
throws Exception {
+ return new ArrayList<>(pendingSplits);
+ }
+
+ @Override
+ public void addSplits(List<AmazonDynamoDBSourceSplit> splits) {
+ this.pendingSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ log.info("Reader received noMoreSplit event.");
+ noMoreSplit = true;
}
+
+ private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow>
output)
+ throws Exception {
+ Map<String, AttributeValue> lastKeyEvaluated = null;
+ ScanIterable scan;
+ ScanRequest scanRequest =
+ ScanRequest.builder()
+ .tableName(amazondynamodbSourceOptions.getTable())
+ .limit(split.getItemCount())
+ .segment(split.getSplitId())
+ .totalSegments(split.getTotalSegments())
+ .build();
+ scan = dynamoDbClient.scanPaginator(scanRequest);
+ do {
+
+ scan.items()
+ .forEach(
+ item -> {
+
output.collect(seaTunnelRowDeserializer.deserialize(item));
+ });
+
+ } while (scan.iterator().hasNext() && !noMoreSplit);
+
+ if (noMoreSplit && pendingSplits.isEmpty()) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded amazonDynamodb source");
+ context.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplit.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplit.java
new file mode 100644
index 0000000000..08a760998e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@AllArgsConstructor
+@Getter
+@Setter
+public class AmazonDynamoDBSourceSplit implements SourceSplit {
+
+ private Integer splitId;
+ private Integer totalSegments;
+ private Integer itemCount;
+
+ @Override
+ public String splitId() {
+ return splitId.toString();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
new file mode 100644
index 0000000000..48ac0a5bc8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AmazonDynamoDBSourceSplitEnumerator
+ implements SourceSplitEnumerator<AmazonDynamoDBSourceSplit,
AmazonDynamoDBSourceState> {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(AmazonDynamoDBSourceSplitEnumerator.class);
+
+ private final SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit>
enumeratorContext;
+ private final Map<Integer, List<AmazonDynamoDBSourceSplit>> pendingSplits;
+ private final AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions;
+
+ private final Object stateLock = new Object();
+ private volatile boolean shouldEnumerate;
+
+ public AmazonDynamoDBSourceSplitEnumerator(
+ Context<AmazonDynamoDBSourceSplit> enumeratorContext,
+ AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions) {
+ this(enumeratorContext, amazonDynamoDBSourceOptions, null);
+ }
+
+ public AmazonDynamoDBSourceSplitEnumerator(
+ Context<AmazonDynamoDBSourceSplit> enumeratorContext,
+ AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions,
+ AmazonDynamoDBSourceState sourceState) {
+ this.enumeratorContext = enumeratorContext;
+ this.amazonDynamoDBSourceOptions = amazonDynamoDBSourceOptions;
+ this.pendingSplits = new HashMap<>();
+ this.shouldEnumerate = sourceState == null;
+ if (sourceState != null) {
+ this.shouldEnumerate = sourceState.isShouldEnumerate();
+ this.pendingSplits.putAll(sourceState.getPendingSplits());
+ }
+ }
+
+ @Override
+ public void open() {}
+
+ @Override
+ public void run() throws Exception {
+ Set<Integer> readers = enumeratorContext.registeredReaders();
+ if (shouldEnumerate) {
+ Set<AmazonDynamoDBSourceSplit> newSplits = discoverySplits();
+
+ synchronized (stateLock) {
+ addPendingSplit(newSplits);
+ shouldEnumerate = false;
+ }
+
+ assignSplit(readers);
+ }
+ log.debug(
+ "No more splits to assign." + " Sending NoMoreSplitsEvent to
reader {}.", readers);
+ readers.forEach(enumeratorContext::signalNoMoreSplits);
+ }
+
+ private void assignSplit(Set<Integer> readers) {
+ for (int reader : readers) {
+ List<AmazonDynamoDBSourceSplit> assignmentForReader =
pendingSplits.remove(reader);
+ if (assignmentForReader != null && !assignmentForReader.isEmpty())
{
+ log.info("Assign splits {} to reader {}", assignmentForReader,
reader);
+ try {
+ enumeratorContext.assignSplit(reader, assignmentForReader);
+ } catch (Exception e) {
+ log.error(
+ "Failed to assign splits {} to reader {}",
+ assignmentForReader,
+ reader,
+ e);
+ pendingSplits.put(reader, assignmentForReader);
+ }
+ }
+ }
+ }
+
+ private void addPendingSplit(Collection<AmazonDynamoDBSourceSplit> splits)
{
+ int readerCount = enumeratorContext.currentParallelism();
+ for (AmazonDynamoDBSourceSplit split : splits) {
+ int ownerReader = getSplitOwner(split.getTotalSegments(),
readerCount);
+ log.info("Assigning {} to {} reader.", split, ownerReader);
+ pendingSplits.computeIfAbsent(ownerReader, r -> new
ArrayList<>()).add(split);
+ }
+ }
+
+ private static int getSplitOwner(Integer tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ private Set<AmazonDynamoDBSourceSplit> discoverySplits() {
+ Set<AmazonDynamoDBSourceSplit> allSplit = new HashSet<>();
+ int totalSegments = amazonDynamoDBSourceOptions.parallelScanThreads;
+ int itemLimit = amazonDynamoDBSourceOptions.scanItemLimit;
+ for (int i = 0; i < totalSegments; i++) {
+ AmazonDynamoDBSourceSplit split =
+ new AmazonDynamoDBSourceSplit(i, totalSegments, itemLimit);
+
+ allSplit.add(split);
+ }
+ return allSplit;
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public void addSplitsBack(List<AmazonDynamoDBSourceSplit> splits, int
subtaskId) {
+ log.debug("Add back splits {} to
AmazonDynamoDBSourceSplitEnumerator.", splits);
+ if (!splits.isEmpty()) {
+ addPendingSplit(splits);
+ assignSplit(Collections.singleton(subtaskId));
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplits.size();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {}
+
+ @Override
+ public void registerReader(int subtaskId) {
+ log.debug("Register reader {} to IoTDBSourceSplitEnumerator.",
subtaskId);
+ if (!pendingSplits.isEmpty()) {
+ assignSplit(Collections.singleton(subtaskId));
+ }
+ }
+
+ @Override
+ public AmazonDynamoDBSourceState snapshotState(long checkpointId) throws
Exception {
+ synchronized (stateLock) {
+ return new AmazonDynamoDBSourceState(shouldEnumerate,
pendingSplits);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceState.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceState.java
new file mode 100644
index 0000000000..5ea15dc34a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class AmazonDynamoDBSourceState implements Serializable {
+ private boolean shouldEnumerate;
+ private Map<Integer, List<AmazonDynamoDBSourceSplit>> pendingSplits;
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
index 3d154f4897..55c2c46a51 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
@@ -19,7 +19,7 @@
######
env {
- execution.parallelism = 1
+ execution.parallelism = 2
job.mode = "BATCH"
}
@@ -31,6 +31,7 @@ source {
access_key_id = "dummy-key"
secret_access_key = "dummy-secret"
table = "source_table"
+ parallelism = 2
schema = {
fields {
id = string
@@ -66,6 +67,8 @@ sink {
access_key_id = "dummy-key"
secret_access_key = "dummy-secret"
table = "sink_table"
+ scan_item_limit = 2
+ parallel_scan_threads=4
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2