This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 09995159a0 [Bugfix][AmazonDynamoDB] Fix the problem that all table 
data cannot be obtained (#5146)
09995159a0 is described below

commit 09995159a0e47a6475f69c13e24b5d0ca6f3498f
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Aug 9 12:27:01 2023 +0800

    [Bugfix][AmazonDynamoDB] Fix the problem that all table data cannot be 
obtained (#5146)
---
 .../source/AmazonDynamoDBSourceReader.java         | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index afaafa3f8a..c25f8b0e0b 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -31,11 +31,13 @@ import 
software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
 import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
 
 @Slf4j
 public class AmazonDynamoDBSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
@@ -78,18 +80,25 @@ public class AmazonDynamoDBSourceReader extends 
AbstractSingleSplitReader<SeaTun
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        ScanResponse scan =
-                dynamoDbClient.scan(
-                        ScanRequest.builder()
-                                
.tableName(amazondynamodbSourceOptions.getTable())
-                                .build());
-        if (scan.hasItems()) {
-            scan.items()
-                    .forEach(
-                            item -> {
-                                
output.collect(seaTunnelRowDeserializer.deserialize(item));
-                            });
-        }
+        Map<String, AttributeValue> lastKeyEvaluated = null;
+
+        ScanResponse scan;
+        do {
+            scan =
+                    dynamoDbClient.scan(
+                            ScanRequest.builder()
+                                    
.tableName(amazondynamodbSourceOptions.getTable())
+                                    .exclusiveStartKey(lastKeyEvaluated)
+                                    .build());
+            if (scan.hasItems()) {
+                scan.items()
+                        .forEach(
+                                item -> {
+                                    
output.collect(seaTunnelRowDeserializer.deserialize(item));
+                                });
+            }
+            lastKeyEvaluated = scan.lastEvaluatedKey();
+        } while (lastKeyEvaluated != null && !lastKeyEvaluated.isEmpty());
         context.signalNoMoreElement();
     }
 }

Reply via email to