This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7376ec7ab [Improve][Connector-V2] Optimize the code structure (#2380)
7376ec7ab is described below
commit 7376ec7ab1bf02e56d89f051eedb827305736af5
Author: TyrantLucifer <[email protected]>
AuthorDate: Mon Aug 8 09:05:11 2022 +0800
[Improve][Connector-V2] Optimize the code structure (#2380)
1. Optimize list traversal
2. Optimize map key judge logic
3. Add type for array list
4. Make class attribute final
---
.../file/sink/FileSinkAggregatedCommitter.java | 18 ++++++------------
1 file changed, 6 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
index 3c7c8cf9c..4d68d75ad 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
@@ -34,8 +34,7 @@ import java.util.Set;
public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileSinkAggregatedCommitter.class);
-
- private FileSystemCommitter fileSystemCommitter;
+ private final FileSystemCommitter fileSystemCommitter;
public FileSinkAggregatedCommitter(@NonNull FileSystemCommitter
fileSystemCommitter) {
this.fileSystemCommitter = fileSystemCommitter;
@@ -46,8 +45,8 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
if (aggregatedCommitInfoList == null ||
aggregatedCommitInfoList.size() == 0) {
return null;
}
- List errorAggregatedCommitInfoList = new ArrayList();
- aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+ List<FileAggregatedCommitInfo> errorAggregatedCommitInfoList = new
ArrayList<>();
+ aggregatedCommitInfoList.forEach(aggregateCommitInfo -> {
try {
fileSystemCommitter.commitTransaction(aggregateCommitInfo);
} catch (Exception e) {
@@ -66,12 +65,8 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
}
Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
- commitInfos.stream().forEach(commitInfo -> {
- Map<String, String> needMoveFileMap =
aggregateCommitInfo.get(commitInfo.getTransactionDir());
- if (needMoveFileMap == null) {
- needMoveFileMap = new HashMap<>();
- aggregateCommitInfo.put(commitInfo.getTransactionDir(),
needMoveFileMap);
- }
+ commitInfos.forEach(commitInfo -> {
+ Map<String, String> needMoveFileMap =
aggregateCommitInfo.computeIfAbsent(commitInfo.getTransactionDir(), k -> new
HashMap<>());
needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
Set<Map.Entry<String, List<String>>> entries =
commitInfo.getPartitionDirAndValsMap().entrySet();
if (!CollectionUtils.isEmpty(entries)) {
@@ -86,10 +81,9 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
if (aggregatedCommitInfoList == null ||
aggregatedCommitInfoList.size() == 0) {
return;
}
- aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+ aggregatedCommitInfoList.forEach(aggregateCommitInfo -> {
try {
fileSystemCommitter.abortTransaction(aggregateCommitInfo);
-
} catch (Exception e) {
LOGGER.error("abort aggregateCommitInfo error ", e);
}