This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 740c14422d [connector-v2] add amazondynamicdb source split (#5275)
740c14422d is described below

commit 740c14422d2d58f4618438691c59d4ea97cbf8be
Author: john <[email protected]>
AuthorDate: Wed Oct 25 16:59:34 2023 +0800

    [connector-v2] add amazondynamicdb source split (#5275)
    
    Co-authored-by: hailin0 <[email protected]>
---
 docs/en/connector-v2/source/AmazonDynamoDB.md      |  31 ++--
 .../config/AmazonDynamoDBConfig.java               |  14 ++
 .../config/AmazonDynamoDBSourceOptions.java        |  13 ++
 .../source/AmazonDynamoDBSource.java               |  36 ++++-
 .../source/AmazonDynamoDBSourceFactory.java        |   3 +
 .../source/AmazonDynamoDBSourceReader.java         |  93 ++++++++---
 .../source/AmazonDynamoDBSourceSplit.java          |  39 +++++
 .../AmazonDynamoDBSourceSplitEnumerator.java       | 172 +++++++++++++++++++++
 .../source/AmazonDynamoDBSourceState.java          |  34 ++++
 .../resources/amazondynamodbIT_source_to_sink.conf |   5 +-
 10 files changed, 398 insertions(+), 42 deletions(-)

diff --git a/docs/en/connector-v2/source/AmazonDynamoDB.md 
b/docs/en/connector-v2/source/AmazonDynamoDB.md
index ef5eee90e9..3261046b73 100644
--- a/docs/en/connector-v2/source/AmazonDynamoDB.md
+++ b/docs/en/connector-v2/source/AmazonDynamoDB.md
@@ -12,20 +12,22 @@ Read data from Amazon DynamoDB.
 - [ ] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
 - [ ] [column projection](../../concept/connector-v2-features.md)
-- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 
 ## Options
 
-|       name        |  type  | required | default value |
-|-------------------|--------|----------|---------------|
-| url               | string | yes      | -             |
-| region            | string | yes      | -             |
-| access_key_id     | string | yes      | -             |
-| secret_access_key | string | yes      | -             |
-| table             | string | yes      | -             |
-| schema            | config | yes      | -             |
-| common-options    |        | yes      | -             |
+|         name          |  type  | required | default value |
+|-----------------------|--------|----------|---------------|
+| url                   | string | yes      | -             |
+| region                | string | yes      | -             |
+| access_key_id         | string | yes      | -             |
+| secret_access_key     | string | yes      | -             |
+| table                 | string | yes      | -             |
+| schema                | config | yes      | -             |
+| common-options        |        | yes      | -             |
+| scan_item_limit       |        | false    | -             |
+| parallel_scan_threads |        | false    | -             |
 
 ### url [string]
 
@@ -69,6 +71,14 @@ schema {
 
 Source Plugin common parameters, refer to [Source Plugin](common-options.md) 
for details
 
+### scan_item_limit
+
+number of item each scan request should return
+
+### parallel_scan_threads
+
+number of logical segments for parallel scan
+
 ## Example
 
 ```bash
@@ -106,4 +116,5 @@ Amazondynamodb {
 ### next version
 
 - Add Amazon DynamoDB Source Connector
+- Add source  split to Amazondynamodb Connectors
 
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index 5194e50f7c..87d69c74c1 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -60,4 +60,18 @@ public class AmazonDynamoDBConfig implements Serializable {
                     .intType()
                     .defaultValue(1000)
                     .withDescription("The batch interval of Amazon DynamoDB");
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static final Option<Integer> SCAN_ITEM_LIMIT =
+            Options.key("scan_item_limit")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("number of item each scan request should 
return");
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static final Option<Integer> PARALLEL_SCAN_THREADS =
+            Options.key("parallel_scan_threads")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription("number of logical segments for parallel 
scan");
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
index d5183c952e..bd4f32b08f 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
@@ -43,6 +43,9 @@ public class AmazonDynamoDBSourceOptions implements 
Serializable {
     private Config schema;
 
     public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue();
+    public int batchIntervalMs = 
AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue();
+    public int scanItemLimit = 
AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.defaultValue();
+    public int parallelScanThreads = 
AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.defaultValue();
 
     public AmazonDynamoDBSourceOptions(Config config) {
         this.url = config.getString(AmazonDynamoDBConfig.URL.key());
@@ -56,5 +59,15 @@ public class AmazonDynamoDBSourceOptions implements 
Serializable {
         if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) {
             this.batchSize = 
config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key());
         }
+        if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) {
+            this.batchIntervalMs = 
config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key());
+        }
+        if (config.hasPath(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key())) {
+            this.scanItemLimit = 
config.getInt(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key());
+        }
+        if (config.hasPath(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key())) {
+            this.parallelScanThreads =
+                    
config.getInt(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key());
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
index da84c4099b..e0175b4da7 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
@@ -23,7 +23,10 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,9 +37,6 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
-import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
@@ -49,8 +49,11 @@ import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.Am
 
 @Slf4j
 @AutoService(SeaTunnelSource.class)
-public class AmazonDynamoDBSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
-        implements SupportColumnProjection {
+public class AmazonDynamoDBSource
+        implements SeaTunnelSource<
+                        SeaTunnelRow, AmazonDynamoDBSourceSplit, 
AmazonDynamoDBSourceState>,
+                SupportParallelism,
+                SupportColumnProjection {
 
     private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
 
@@ -94,8 +97,27 @@ public class AmazonDynamoDBSource extends 
AbstractSingleSplitSource<SeaTunnelRow
     }
 
     @Override
-    public AbstractSingleSplitReader<SeaTunnelRow> createReader(
-            SingleSplitReaderContext readerContext) throws Exception {
+    public SourceSplitEnumerator<AmazonDynamoDBSourceSplit, 
AmazonDynamoDBSourceState>
+            createEnumerator(
+                    SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit> 
enumeratorContext)
+                    throws Exception {
+        return new AmazonDynamoDBSourceSplitEnumerator(
+                enumeratorContext, amazondynamodbSourceOptions);
+    }
+
+    @Override
+    public SourceSplitEnumerator<AmazonDynamoDBSourceSplit, 
AmazonDynamoDBSourceState>
+            restoreEnumerator(
+                    SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit> 
enumeratorContext,
+                    AmazonDynamoDBSourceState checkpointState)
+                    throws Exception {
+        return new AmazonDynamoDBSourceSplitEnumerator(
+                enumeratorContext, amazondynamodbSourceOptions, 
checkpointState);
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, AmazonDynamoDBSourceSplit> createReader(
+            SourceReader.Context readerContext) throws Exception {
         return new AmazonDynamoDBSourceReader(readerContext, 
amazondynamodbSourceOptions, typeInfo);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
index 994f862908..047d78c0d6 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
@@ -26,7 +26,9 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import com.google.auto.service.AutoService;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SCAN_ITEM_LIMIT;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
@@ -48,6 +50,7 @@ public class AmazonDynamoDBSourceFactory implements 
TableSourceFactory {
                         SECRET_ACCESS_KEY,
                         TABLE,
                         TableSchemaOptions.SCHEMA)
+                .optional(SCAN_ITEM_LIMIT, PARALLEL_SCAN_THREADS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index c25f8b0e0b..7c84614e39 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -18,13 +18,12 @@
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
-import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import lombok.extern.slf4j.Slf4j;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -33,22 +32,30 @@ import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
-import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 @Slf4j
-public class AmazonDynamoDBSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
+public class AmazonDynamoDBSourceReader
+        implements SourceReader<SeaTunnelRow, AmazonDynamoDBSourceSplit> {
 
     protected DynamoDbClient dynamoDbClient;
-    protected SingleSplitReaderContext context;
+    protected SourceReader.Context context;
     protected AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
     protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
+    Queue<AmazonDynamoDBSourceSplit> pendingSplits = new 
ConcurrentLinkedDeque<>();
+
+    private volatile boolean noMoreSplit;
 
     public AmazonDynamoDBSourceReader(
-            SingleSplitReaderContext context,
+            SourceReader.Context context,
             AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
             SeaTunnelRowType typeInfo) {
         this.context = context;
@@ -80,25 +87,63 @@ public class AmazonDynamoDBSourceReader extends 
AbstractSingleSplitReader<SeaTun
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        Map<String, AttributeValue> lastKeyEvaluated = null;
+        while (!pendingSplits.isEmpty()) {
+            synchronized (output.getCheckpointLock()) {
+                AmazonDynamoDBSourceSplit split = pendingSplits.poll();
 
-        ScanResponse scan;
-        do {
-            scan =
-                    dynamoDbClient.scan(
-                            ScanRequest.builder()
-                                    
.tableName(amazondynamodbSourceOptions.getTable())
-                                    .exclusiveStartKey(lastKeyEvaluated)
-                                    .build());
-            if (scan.hasItems()) {
-                scan.items()
-                        .forEach(
-                                item -> {
-                                    
output.collect(seaTunnelRowDeserializer.deserialize(item));
-                                });
+                read(split, output);
             }
-            lastKeyEvaluated = scan.lastEvaluatedKey();
-        } while (lastKeyEvaluated != null && !lastKeyEvaluated.isEmpty());
-        context.signalNoMoreElement();
+        }
+        if (pendingSplits.isEmpty() && noMoreSplit) {
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId) 
throws Exception {
+        return new ArrayList<>(pendingSplits);
+    }
+
+    @Override
+    public void addSplits(List<AmazonDynamoDBSourceSplit> splits) {
+        this.pendingSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        log.info("Reader received noMoreSplit event.");
+        noMoreSplit = true;
     }
+
+    private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow> 
output)
+            throws Exception {
+        Map<String, AttributeValue> lastKeyEvaluated = null;
+        ScanIterable scan;
+        ScanRequest scanRequest =
+                ScanRequest.builder()
+                        .tableName(amazondynamodbSourceOptions.getTable())
+                        .limit(split.getItemCount())
+                        .segment(split.getSplitId())
+                        .totalSegments(split.getTotalSegments())
+                        .build();
+        scan = dynamoDbClient.scanPaginator(scanRequest);
+        do {
+
+            scan.items()
+                    .forEach(
+                            item -> {
+                                
output.collect(seaTunnelRowDeserializer.deserialize(item));
+                            });
+
+        } while (scan.iterator().hasNext() && !noMoreSplit);
+
+        if (noMoreSplit && pendingSplits.isEmpty()) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded amazonDynamodb source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplit.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplit.java
new file mode 100644
index 0000000000..08a760998e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@AllArgsConstructor
+@Getter
+@Setter
+public class AmazonDynamoDBSourceSplit implements SourceSplit {
+
+    private Integer splitId;
+    private Integer totalSegments;
+    private Integer itemCount;
+
+    @Override
+    public String splitId() {
+        return splitId.toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
new file mode 100644
index 0000000000..48ac0a5bc8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AmazonDynamoDBSourceSplitEnumerator
+        implements SourceSplitEnumerator<AmazonDynamoDBSourceSplit, 
AmazonDynamoDBSourceState> {
+
+    private static final Logger log =
+            LoggerFactory.getLogger(AmazonDynamoDBSourceSplitEnumerator.class);
+
+    private final SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit> 
enumeratorContext;
+    private final Map<Integer, List<AmazonDynamoDBSourceSplit>> pendingSplits;
+    private final AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions;
+
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    public AmazonDynamoDBSourceSplitEnumerator(
+            Context<AmazonDynamoDBSourceSplit> enumeratorContext,
+            AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions) {
+        this(enumeratorContext, amazonDynamoDBSourceOptions, null);
+    }
+
+    public AmazonDynamoDBSourceSplitEnumerator(
+            Context<AmazonDynamoDBSourceSplit> enumeratorContext,
+            AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions,
+            AmazonDynamoDBSourceState sourceState) {
+        this.enumeratorContext = enumeratorContext;
+        this.amazonDynamoDBSourceOptions = amazonDynamoDBSourceOptions;
+        this.pendingSplits = new HashMap<>();
+        this.shouldEnumerate = sourceState == null;
+        if (sourceState != null) {
+            this.shouldEnumerate = sourceState.isShouldEnumerate();
+            this.pendingSplits.putAll(sourceState.getPendingSplits());
+        }
+    }
+
+    @Override
+    public void open() {}
+
+    @Override
+    public void run() throws Exception {
+        Set<Integer> readers = enumeratorContext.registeredReaders();
+        if (shouldEnumerate) {
+            Set<AmazonDynamoDBSourceSplit> newSplits = discoverySplits();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
+
+            assignSplit(readers);
+        }
+        log.debug(
+                "No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(enumeratorContext::signalNoMoreSplits);
+    }
+
+    private void assignSplit(Set<Integer> readers) {
+        for (int reader : readers) {
+            List<AmazonDynamoDBSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                log.info("Assign splits {} to reader {}", assignmentForReader, 
reader);
+                try {
+                    enumeratorContext.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    log.error(
+                            "Failed to assign splits {} to reader {}",
+                            assignmentForReader,
+                            reader,
+                            e);
+                    pendingSplits.put(reader, assignmentForReader);
+                }
+            }
+        }
+    }
+
+    private void addPendingSplit(Collection<AmazonDynamoDBSourceSplit> splits) 
{
+        int readerCount = enumeratorContext.currentParallelism();
+        for (AmazonDynamoDBSourceSplit split : splits) {
+            int ownerReader = getSplitOwner(split.getTotalSegments(), 
readerCount);
+            log.info("Assigning {} to {} reader.", split, ownerReader);
+            pendingSplits.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).add(split);
+        }
+    }
+
+    private static int getSplitOwner(Integer tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    private Set<AmazonDynamoDBSourceSplit> discoverySplits() {
+        Set<AmazonDynamoDBSourceSplit> allSplit = new HashSet<>();
+        int totalSegments = amazonDynamoDBSourceOptions.parallelScanThreads;
+        int itemLimit = amazonDynamoDBSourceOptions.scanItemLimit;
+        for (int i = 0; i < totalSegments; i++) {
+            AmazonDynamoDBSourceSplit split =
+                    new AmazonDynamoDBSourceSplit(i, totalSegments, itemLimit);
+
+            allSplit.add(split);
+        }
+        return allSplit;
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public void addSplitsBack(List<AmazonDynamoDBSourceSplit> splits, int 
subtaskId) {
+        log.debug("Add back splits {} to 
AmazonDynamoDBSourceSplitEnumerator.", splits);
+        if (!splits.isEmpty()) {
+            addPendingSplit(splits);
+            assignSplit(Collections.singleton(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplits.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {}
+
+    @Override
+    public void registerReader(int subtaskId) {
+        log.debug("Register reader {} to IoTDBSourceSplitEnumerator.", 
subtaskId);
+        if (!pendingSplits.isEmpty()) {
+            assignSplit(Collections.singleton(subtaskId));
+        }
+    }
+
+    @Override
+    public AmazonDynamoDBSourceState snapshotState(long checkpointId) throws 
Exception {
+        synchronized (stateLock) {
+            return new AmazonDynamoDBSourceState(shouldEnumerate, 
pendingSplits);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceState.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceState.java
new file mode 100644
index 0000000000..5ea15dc34a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class AmazonDynamoDBSourceState implements Serializable {
+    private boolean shouldEnumerate;
+    private Map<Integer, List<AmazonDynamoDBSourceSplit>> pendingSplits;
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
index 3d154f4897..55c2c46a51 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
@@ -19,7 +19,7 @@
 ######
 
 env {
-  execution.parallelism = 1
+  execution.parallelism = 2
   job.mode = "BATCH"
 }
 
@@ -31,6 +31,7 @@ source {
     access_key_id = "dummy-key"
     secret_access_key = "dummy-secret"
     table = "source_table"
+    parallelism = 2
     schema = {
       fields {
         id = string
@@ -66,6 +67,8 @@ sink {
     access_key_id = "dummy-key"
     secret_access_key = "dummy-secret"
     table = "sink_table"
+    scan_item_limit = 2
+    parallel_scan_threads=4
   }
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
   # please go to https://seatunnel.apache.org/docs/category/sink-v2

Reply via email to