This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a17dd7afc1 [Improve] Code clean for AmazonDynamoDB connector (#5791)
a17dd7afc1 is described below

commit a17dd7afc1806d6a980844fb0f3cca5a1ca8ef5c
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 13:34:03 2023 +0800

    [Improve] Code clean for AmazonDynamoDB connector (#5791)
---
 .../AmazonDynamoDBConnectorException.java          | 10 --------
 .../amazondynamodb/sink/AmazonDynamoDBWriter.java  |  2 +-
 .../amazondynamodb/sink/DynamoDbSinkClient.java    | 27 +++-------------------
 .../source/AmazonDynamoDBSourceReader.java         | 20 ++++++----------
 4 files changed, 11 insertions(+), 48 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
index 23665a5f07..6f7bddbece 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
@@ -25,14 +25,4 @@ public class AmazonDynamoDBConnectorException extends 
SeaTunnelRuntimeException
             SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
         super(seaTunnelErrorCode, errorMessage);
     }
-
-    public AmazonDynamoDBConnectorException(
-            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, 
Throwable cause) {
-        super(seaTunnelErrorCode, errorMessage, cause);
-    }
-
-    public AmazonDynamoDBConnectorException(
-            SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
-        super(seaTunnelErrorCode, cause);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
index d059bce7b5..aa27a4b714 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
@@ -35,7 +35,7 @@ public class AmazonDynamoDBWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     public AmazonDynamoDBWriter(
             AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
             SeaTunnelRowType seaTunnelRowType) {
-        dynamoDbSinkClient = new 
DynamoDbSinkClient(amazondynamodbSourceOptions, seaTunnelRowType);
+        dynamoDbSinkClient = new 
DynamoDbSinkClient(amazondynamodbSourceOptions);
         serializer =
                 new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
amazondynamodbSourceOptions);
     }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
index e42f573dfb..b12ba15d9d 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
@@ -17,12 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
 
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -33,7 +28,6 @@ import 
software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
 import software.amazon.awssdk.services.dynamodb.model.PutRequest;
 import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -43,16 +37,12 @@ import java.util.Map;
 public class DynamoDbSinkClient {
     private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
     private volatile boolean initialize;
-    private volatile Exception flushException;
     private DynamoDbClient dynamoDbClient;
     private final List<WriteRequest> batchList;
-    protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
 
-    public DynamoDbSinkClient(
-            AmazonDynamoDBSourceOptions amazondynamodbSourceOptions, 
SeaTunnelRowType typeInfo) {
+    public DynamoDbSinkClient(AmazonDynamoDBSourceOptions 
amazondynamodbSourceOptions) {
         this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
         this.batchList = new ArrayList<>();
-        this.seaTunnelRowDeserializer = new 
DefaultSeaTunnelRowDeserializer(typeInfo);
     }
 
     private void tryInit() {
@@ -74,9 +64,8 @@ public class DynamoDbSinkClient {
         initialize = true;
     }
 
-    public synchronized void write(PutItemRequest putItemRequest) throws 
IOException {
+    public synchronized void write(PutItemRequest putItemRequest) {
         tryInit();
-        checkFlushException();
         batchList.add(
                 WriteRequest.builder()
                         
.putRequest(PutRequest.builder().item(putItemRequest.item()).build())
@@ -87,7 +76,7 @@ public class DynamoDbSinkClient {
         }
     }
 
-    public synchronized void close() throws IOException {
+    public synchronized void close() {
         if (dynamoDbClient != null) {
             flush();
             dynamoDbClient.close();
@@ -95,7 +84,6 @@ public class DynamoDbSinkClient {
     }
 
     synchronized void flush() {
-        checkFlushException();
         if (batchList.isEmpty()) {
             return;
         }
@@ -106,13 +94,4 @@ public class DynamoDbSinkClient {
 
         batchList.clear();
     }
-
-    private void checkFlushException() {
-        if (flushException != null) {
-            throw new AmazonDynamoDBConnectorException(
-                    CommonErrorCode.FLUSH_DATA_FAILED,
-                    "Flush data to AmazonDynamoDB failed.",
-                    flushException);
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index 7c84614e39..4f54e7f70f 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -30,15 +30,12 @@ import 
software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
 import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
@@ -64,7 +61,7 @@ public class AmazonDynamoDBSourceReader
     }
 
     @Override
-    public void open() throws Exception {
+    public void open() {
         dynamoDbClient =
                 DynamoDbClient.builder()
                         
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
@@ -80,27 +77,26 @@ public class AmazonDynamoDBSourceReader
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
         dynamoDbClient.close();
     }
 
     @Override
     @SuppressWarnings("magicnumber")
-    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+    public void pollNext(Collector<SeaTunnelRow> output) {
         while (!pendingSplits.isEmpty()) {
             synchronized (output.getCheckpointLock()) {
                 AmazonDynamoDBSourceSplit split = pendingSplits.poll();
-
                 read(split, output);
             }
         }
-        if (pendingSplits.isEmpty() && noMoreSplit) {
+        if (noMoreSplit && pendingSplits.isEmpty()) {
             context.signalNoMoreElement();
         }
     }
 
     @Override
-    public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId) 
throws Exception {
+    public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId) {
         return new ArrayList<>(pendingSplits);
     }
 
@@ -115,9 +111,7 @@ public class AmazonDynamoDBSourceReader
         noMoreSplit = true;
     }
 
-    private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow> 
output)
-            throws Exception {
-        Map<String, AttributeValue> lastKeyEvaluated = null;
+    private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow> 
output) {
         ScanIterable scan;
         ScanRequest scanRequest =
                 ScanRequest.builder()
@@ -145,5 +139,5 @@ public class AmazonDynamoDBSourceReader
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+    public void notifyCheckpointComplete(long checkpointId) {}
 }

Reply via email to