This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a17dd7afc1 [Improve] Code clean for AmazonDynamoDB connector (#5791)
a17dd7afc1 is described below
commit a17dd7afc1806d6a980844fb0f3cca5a1ca8ef5c
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 13:34:03 2023 +0800
[Improve] Code clean for AmazonDynamoDB connector (#5791)
---
.../AmazonDynamoDBConnectorException.java | 10 --------
.../amazondynamodb/sink/AmazonDynamoDBWriter.java | 2 +-
.../amazondynamodb/sink/DynamoDbSinkClient.java | 27 +++-------------------
.../source/AmazonDynamoDBSourceReader.java | 20 ++++++----------
4 files changed, 11 insertions(+), 48 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
index 23665a5f07..6f7bddbece 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java
@@ -25,14 +25,4 @@ public class AmazonDynamoDBConnectorException extends
SeaTunnelRuntimeException
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}
-
- public AmazonDynamoDBConnectorException(
- SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage,
Throwable cause) {
- super(seaTunnelErrorCode, errorMessage, cause);
- }
-
- public AmazonDynamoDBConnectorException(
- SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
- super(seaTunnelErrorCode, cause);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
index d059bce7b5..aa27a4b714 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
@@ -35,7 +35,7 @@ public class AmazonDynamoDBWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
public AmazonDynamoDBWriter(
AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
SeaTunnelRowType seaTunnelRowType) {
- dynamoDbSinkClient = new
DynamoDbSinkClient(amazondynamodbSourceOptions, seaTunnelRowType);
+ dynamoDbSinkClient = new
DynamoDbSinkClient(amazondynamodbSourceOptions);
serializer =
new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
amazondynamodbSourceOptions);
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
index e42f573dfb..b12ba15d9d 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
@@ -17,12 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
-import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -33,7 +28,6 @@ import
software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
-import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
@@ -43,16 +37,12 @@ import java.util.Map;
public class DynamoDbSinkClient {
private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
private volatile boolean initialize;
- private volatile Exception flushException;
private DynamoDbClient dynamoDbClient;
private final List<WriteRequest> batchList;
- protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
- public DynamoDbSinkClient(
- AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
SeaTunnelRowType typeInfo) {
+ public DynamoDbSinkClient(AmazonDynamoDBSourceOptions
amazondynamodbSourceOptions) {
this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
this.batchList = new ArrayList<>();
- this.seaTunnelRowDeserializer = new
DefaultSeaTunnelRowDeserializer(typeInfo);
}
private void tryInit() {
@@ -74,9 +64,8 @@ public class DynamoDbSinkClient {
initialize = true;
}
- public synchronized void write(PutItemRequest putItemRequest) throws
IOException {
+ public synchronized void write(PutItemRequest putItemRequest) {
tryInit();
- checkFlushException();
batchList.add(
WriteRequest.builder()
.putRequest(PutRequest.builder().item(putItemRequest.item()).build())
@@ -87,7 +76,7 @@ public class DynamoDbSinkClient {
}
}
- public synchronized void close() throws IOException {
+ public synchronized void close() {
if (dynamoDbClient != null) {
flush();
dynamoDbClient.close();
@@ -95,7 +84,6 @@ public class DynamoDbSinkClient {
}
synchronized void flush() {
- checkFlushException();
if (batchList.isEmpty()) {
return;
}
@@ -106,13 +94,4 @@ public class DynamoDbSinkClient {
batchList.clear();
}
-
- private void checkFlushException() {
- if (flushException != null) {
- throw new AmazonDynamoDBConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- "Flush data to AmazonDynamoDB failed.",
- flushException);
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index 7c84614e39..4f54e7f70f 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -30,15 +30,12 @@ import
software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
-import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -64,7 +61,7 @@ public class AmazonDynamoDBSourceReader
}
@Override
- public void open() throws Exception {
+ public void open() {
dynamoDbClient =
DynamoDbClient.builder()
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
@@ -80,27 +77,26 @@ public class AmazonDynamoDBSourceReader
}
@Override
- public void close() throws IOException {
+ public void close() {
dynamoDbClient.close();
}
@Override
@SuppressWarnings("magicnumber")
- public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ public void pollNext(Collector<SeaTunnelRow> output) {
while (!pendingSplits.isEmpty()) {
synchronized (output.getCheckpointLock()) {
AmazonDynamoDBSourceSplit split = pendingSplits.poll();
-
read(split, output);
}
}
- if (pendingSplits.isEmpty() && noMoreSplit) {
+ if (noMoreSplit && pendingSplits.isEmpty()) {
context.signalNoMoreElement();
}
}
@Override
- public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId)
throws Exception {
+ public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId) {
return new ArrayList<>(pendingSplits);
}
@@ -115,9 +111,7 @@ public class AmazonDynamoDBSourceReader
noMoreSplit = true;
}
- private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow>
output)
- throws Exception {
- Map<String, AttributeValue> lastKeyEvaluated = null;
+ private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow>
output) {
ScanIterable scan;
ScanRequest scanRequest =
ScanRequest.builder()
@@ -145,5 +139,5 @@ public class AmazonDynamoDBSourceReader
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+ public void notifyCheckpointComplete(long checkpointId) {}
}