This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ac4e880fb [Feature][Connector-V2][File] Optimize filesystem utils
(#3749)
ac4e880fb is described below
commit ac4e880fb58ece7e0920c962c11f4d768d6519b1
Author: Tyrantlucifer <[email protected]>
AuthorDate: Sat Dec 17 20:31:21 2022 +0800
[Feature][Connector-V2][File] Optimize filesystem utils (#3749)
---
...TextFileConfig.java => BaseFileSinkConfig.java} | 6 +--
.../seatunnel/file/config/FileFormat.java | 26 ++++++------
.../seatunnel/file/sink/BaseFileSink.java | 14 ++++---
.../seatunnel/file/sink/BaseFileSinkWriter.java | 12 ++++--
.../sink/commit/FileSinkAggregatedCommitter.java | 19 ++++-----
.../file/sink/commit/FileSinkCommitter.java | 18 +++++---
...TextFileSinkConfig.java => FileSinkConfig.java} | 6 +--
.../seatunnel/file/sink/util/FileSystemUtils.java | 37 ++++++++++-------
.../file/sink/writer/AbstractWriteStrategy.java | 48 +++++++++++++---------
.../file/sink/writer/JsonWriteStrategy.java | 7 ++--
.../file/sink/writer/OrcWriteStrategy.java | 6 +--
.../file/sink/writer/ParquetWriteStrategy.java | 6 +--
.../file/sink/writer/TextWriteStrategy.java | 7 ++--
.../seatunnel/file/sink/writer/WriteStrategy.java | 25 ++++++++++-
.../file/sink/writer/WriteStrategyFactory.java | 10 ++---
.../hive/commit/HiveSinkAggregatedCommitter.java | 7 ++--
.../connectors/seatunnel/hive/sink/HiveSink.java | 2 +-
.../commit/S3RedshiftSinkAggregatedCommitter.java | 9 ++--
.../seatunnel/redshift/sink/S3RedshiftSink.java | 2 +-
19 files changed, 159 insertions(+), 108 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 6171a845d..2befb3ae6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -35,7 +35,7 @@ import java.io.Serializable;
import java.util.Locale;
@Data
-public class BaseTextFileConfig implements DelimiterConfig, CompressConfig,
Serializable {
+public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig,
Serializable {
private static final long serialVersionUID = 1L;
protected String compressCodec;
protected String fieldDelimiter =
BaseSinkConfig.FIELD_DELIMITER.defaultValue();
@@ -48,7 +48,7 @@ public class BaseTextFileConfig implements DelimiterConfig,
CompressConfig, Seri
protected DateTimeUtils.Formatter datetimeFormat =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
- public BaseTextFileConfig(@NonNull Config config) {
+ public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
"Compress not supported by SeaTunnel file connector now");
@@ -94,5 +94,5 @@ public class BaseTextFileConfig implements DelimiterConfig,
CompressConfig, Seri
}
}
- public BaseTextFileConfig() {}
+ public BaseFileSinkConfig() {}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 4cddc2f4c..0cea4b15e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
@@ -34,9 +34,9 @@ import java.io.Serializable;
public enum FileFormat implements Serializable {
CSV("csv") {
@Override
- public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
- textFileSinkConfig.setFieldDelimiter(",");
- return new TextWriteStrategy(textFileSinkConfig);
+ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+ fileSinkConfig.setFieldDelimiter(",");
+ return new TextWriteStrategy(fileSinkConfig);
}
@Override
@@ -46,8 +46,8 @@ public enum FileFormat implements Serializable {
},
TEXT("txt") {
@Override
- public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
- return new TextWriteStrategy(textFileSinkConfig);
+ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+ return new TextWriteStrategy(fileSinkConfig);
}
@Override
@@ -57,8 +57,8 @@ public enum FileFormat implements Serializable {
},
PARQUET("parquet") {
@Override
- public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
- return new ParquetWriteStrategy(textFileSinkConfig);
+ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+ return new ParquetWriteStrategy(fileSinkConfig);
}
@Override
@@ -68,8 +68,8 @@ public enum FileFormat implements Serializable {
},
ORC("orc") {
@Override
- public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
- return new OrcWriteStrategy(textFileSinkConfig);
+ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+ return new OrcWriteStrategy(fileSinkConfig);
}
@Override
@@ -79,8 +79,8 @@ public enum FileFormat implements Serializable {
},
JSON("json") {
@Override
- public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
- return new JsonWriteStrategy(textFileSinkConfig);
+ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+ return new JsonWriteStrategy(fileSinkConfig);
}
@Override
@@ -103,7 +103,7 @@ public enum FileFormat implements Serializable {
return null;
}
- public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
+ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
return null;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 6bbb2598b..647fe7214 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -31,8 +31,9 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
@@ -46,7 +47,8 @@ public abstract class BaseFileSink implements
SeaTunnelSink<SeaTunnelRow, FileSi
protected SeaTunnelRowType seaTunnelRowType;
protected Config pluginConfig;
protected HadoopConf hadoopConf;
- protected TextFileSinkConfig textFileSinkConfig;
+ protected FileSystemUtils fileSystemUtils;
+ protected FileSinkConfig fileSinkConfig;
protected WriteStrategy writeStrategy;
protected JobContext jobContext;
protected String jobId;
@@ -60,9 +62,11 @@ public abstract class BaseFileSink implements
SeaTunnelSink<SeaTunnelRow, FileSi
@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
- this.textFileSinkConfig = new TextFileSinkConfig(pluginConfig,
seaTunnelRowType);
- this.writeStrategy =
WriteStrategyFactory.of(textFileSinkConfig.getFileFormat(), textFileSinkConfig);
+ this.fileSinkConfig = new FileSinkConfig(pluginConfig,
seaTunnelRowType);
+ this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
+ this.fileSystemUtils = new FileSystemUtils(hadoopConf);
this.writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ this.writeStrategy.setFileSystemUtils(fileSystemUtils);
}
@Override
@@ -77,7 +81,7 @@ public abstract class BaseFileSink implements
SeaTunnelSink<SeaTunnelRow, FileSi
@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
- return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
+ return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index d5e499f0e..d0747cb1b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -42,10 +42,14 @@ import java.util.stream.Collectors;
public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow,
FileCommitInfo, FileSinkState> {
private final WriteStrategy writeStrategy;
+ private final FileSystemUtils fileSystemUtils;
@SuppressWarnings("checkstyle:MagicNumber")
- public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf
hadoopConf, SinkWriter.Context context, String jobId, List<FileSinkState>
fileSinkStates) {
+ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf
hadoopConf,
+ SinkWriter.Context context, String jobId,
+ List<FileSinkState> fileSinkStates) {
this.writeStrategy = writeStrategy;
+ this.fileSystemUtils = writeStrategy.getFileSystemUtils();
int subTaskIndex = context.getIndexOfSubtask();
String uuidPrefix;
if (!fileSinkStates.isEmpty()) {
@@ -58,7 +62,7 @@ public class BaseFileSinkWriter implements
SinkWriter<SeaTunnelRow, FileCommitIn
if (!fileSinkStates.isEmpty()) {
try {
List<String> transactions = findTransactionList(jobId,
uuidPrefix);
- FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(hadoopConf);
+ FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(fileSystemUtils);
HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
fileSinkStates.forEach(fileSinkState ->
fileStatesMap.put(fileSinkState.getTransactionId(),
fileSinkState));
@@ -87,7 +91,9 @@ public class BaseFileSinkWriter implements
SinkWriter<SeaTunnelRow, FileCommitIn
}
private List<String> findTransactionList(String jobId, String uuidPrefix)
throws IOException {
- return
FileSystemUtils.dirList(AbstractWriteStrategy.getTransactionDirPrefix(writeStrategy.getFileSinkConfig().getTmpPath(),
jobId, uuidPrefix))
+ return
fileSystemUtils.dirList(AbstractWriteStrategy.getTransactionDirPrefix(writeStrategy
+ .getFileSinkConfig().getTmpPath(),
+ jobId, uuidPrefix))
.stream().map(Path::getName).collect(Collectors.toList());
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 04c79a835..da51f26e1 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import lombok.extern.slf4j.Slf4j;
@@ -31,12 +30,10 @@ import java.util.Map;
@Slf4j
public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
- protected final HadoopConf hadoopConf;
+ protected final FileSystemUtils fileSystemUtils;
- public FileSinkAggregatedCommitter(HadoopConf hadoopConf) {
- this.hadoopConf = hadoopConf;
- FileSystemUtils.CONF = FileSystemUtils.getConfiguration(hadoopConf);
- log.info("Hadoop configuration initial done, [{}]", hadoopConf);
+ public FileSinkAggregatedCommitter(FileSystemUtils fileSystemUtils) {
+ this.fileSystemUtils = fileSystemUtils;
}
@Override
@@ -47,10 +44,10 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
for (Map.Entry<String, Map<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
// first rename temp file
- FileSystemUtils.renameFile(mvFileEntry.getKey(),
mvFileEntry.getValue(), true);
+ fileSystemUtils.renameFile(mvFileEntry.getKey(),
mvFileEntry.getValue(), true);
}
// second delete transaction directory
- FileSystemUtils.deleteFile(entry.getKey());
+ fileSystemUtils.deleteFile(entry.getKey());
}
} catch (Exception e) {
log.error("commit aggregatedCommitInfo error,
aggregatedCommitInfo = {} ", aggregatedCommitInfo, e);
@@ -100,12 +97,12 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
for (Map.Entry<String, Map<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
// rollback the file
for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
- if (FileSystemUtils.fileExist(mvFileEntry.getValue())
&& !FileSystemUtils.fileExist(mvFileEntry.getKey())) {
- FileSystemUtils.renameFile(mvFileEntry.getValue(),
mvFileEntry.getKey(), true);
+ if (fileSystemUtils.fileExist(mvFileEntry.getValue())
&& !fileSystemUtils.fileExist(mvFileEntry.getKey())) {
+ fileSystemUtils.renameFile(mvFileEntry.getValue(),
mvFileEntry.getKey(), true);
}
}
// delete the transaction dir
- FileSystemUtils.deleteFile(entry.getKey());
+ fileSystemUtils.deleteFile(entry.getKey());
}
} catch (Exception e) {
log.error("abort aggregatedCommitInfo error ", e);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
index 8c5d50e57..7d96c11be 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
@@ -25,8 +25,16 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+/**
+ * Deprecated interface since 2.3.0-beta, now used {@link
FileSinkAggregatedCommitter}
+ */
@Deprecated
public class FileSinkCommitter implements SinkCommitter<FileCommitInfo> {
+ private final FileSystemUtils fileSystemUtils;
+
+ public FileSinkCommitter(FileSystemUtils fileSystemUtils) {
+ this.fileSystemUtils = fileSystemUtils;
+ }
@Override
public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos)
throws IOException {
@@ -35,12 +43,12 @@ public class FileSinkCommitter implements
SinkCommitter<FileCommitInfo> {
Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
needMoveFiles.forEach((k, v) -> {
try {
- FileSystemUtils.renameFile(k, v, true);
+ fileSystemUtils.renameFile(k, v, true);
} catch (IOException e) {
failedCommitInfos.add(commitInfo);
}
});
- FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+ fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
}
return failedCommitInfos;
}
@@ -56,11 +64,11 @@ public class FileSinkCommitter implements
SinkCommitter<FileCommitInfo> {
for (FileCommitInfo commitInfo : commitInfos) {
Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
for (Map.Entry<String, String> entry : needMoveFiles.entrySet()) {
- if (FileSystemUtils.fileExist(entry.getValue()) &&
!FileSystemUtils.fileExist(entry.getKey())) {
- FileSystemUtils.renameFile(entry.getValue(),
entry.getKey(), true);
+ if (fileSystemUtils.fileExist(entry.getValue()) &&
!fileSystemUtils.fileExist(entry.getKey())) {
+ fileSystemUtils.renameFile(entry.getValue(),
entry.getKey(), true);
}
}
- FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+ fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
index 968df88df..3f2d79376 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
@@ -21,8 +21,8 @@ import static
com.google.common.base.Preconditions.checkArgument;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseTextFileConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -42,7 +42,7 @@ import java.util.Map;
import java.util.stream.Collectors;
@Data
-public class TextFileSinkConfig extends BaseTextFileConfig implements
PartitionConfig {
+public class FileSinkConfig extends BaseFileSinkConfig implements
PartitionConfig {
private List<String> sinkColumnList;
@@ -64,7 +64,7 @@ public class TextFileSinkConfig extends BaseTextFileConfig
implements PartitionC
private List<Integer> partitionFieldsIndexInRow;
- public TextFileSinkConfig(@NonNull Config config, @NonNull
SeaTunnelRowType seaTunnelRowTypeInfo) {
+ public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType
seaTunnelRowTypeInfo) {
super(config);
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index a3e3aa1c3..2ec9f986a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -31,18 +31,24 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
+import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@Slf4j
-public class FileSystemUtils {
+public class FileSystemUtils implements Serializable {
+ private static final int WRITE_BUFFER_SIZE = 2048;
- public static final int WRITE_BUFFER_SIZE = 2048;
+ private final HadoopConf hadoopConf;
- public static Configuration CONF;
+ private transient Configuration configuration;
- public static Configuration getConfiguration(HadoopConf hadoopConf) {
+ public FileSystemUtils(HadoopConf hadoopConf) {
+ this.hadoopConf = hadoopConf;
+ }
+
+ public Configuration getConfiguration(HadoopConf hadoopConf) {
Configuration configuration = new Configuration();
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()),
hadoopConf.getFsHdfsImpl());
@@ -50,19 +56,22 @@ public class FileSystemUtils {
return configuration;
}
- public static FileSystem getFileSystem(@NonNull String path) throws
IOException {
- FileSystem fileSystem =
FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), CONF);
+ public FileSystem getFileSystem(@NonNull String path) throws IOException {
+ if (configuration == null) {
+ configuration = getConfiguration(hadoopConf);
+ }
+ FileSystem fileSystem =
FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), configuration);
fileSystem.setWriteChecksum(false);
return fileSystem;
}
- public static FSDataOutputStream getOutputStream(@NonNull String
outFilePath) throws IOException {
+ public FSDataOutputStream getOutputStream(@NonNull String outFilePath)
throws IOException {
FileSystem fileSystem = getFileSystem(outFilePath);
Path path = new Path(outFilePath);
return fileSystem.create(path, true, WRITE_BUFFER_SIZE);
}
- public static void createFile(@NonNull String filePath) throws IOException
{
+ public void createFile(@NonNull String filePath) throws IOException {
FileSystem fileSystem = getFileSystem(filePath);
Path path = new Path(filePath);
if (!fileSystem.createNewFile(path)) {
@@ -71,7 +80,7 @@ public class FileSystemUtils {
}
}
- public static void deleteFile(@NonNull String file) throws IOException {
+ public void deleteFile(@NonNull String file) throws IOException {
FileSystem fileSystem = getFileSystem(file);
Path path = new Path(file);
if (fileSystem.exists(path)) {
@@ -90,7 +99,7 @@ public class FileSystemUtils {
* @param rmWhenExist if this is true, we will delete the target file when
it already exists
* @throws IOException throw IOException
*/
- public static void renameFile(@NonNull String oldName, @NonNull String
newName, boolean rmWhenExist)
+ public void renameFile(@NonNull String oldName, @NonNull String newName,
boolean rmWhenExist)
throws IOException {
FileSystem fileSystem = getFileSystem(newName);
log.info("begin rename file oldName :[" + oldName + "] to newName :["
+ newName + "]");
@@ -121,7 +130,7 @@ public class FileSystemUtils {
}
}
- public static void createDir(@NonNull String filePath) throws IOException {
+ public void createDir(@NonNull String filePath) throws IOException {
FileSystem fileSystem = getFileSystem(filePath);
Path dfs = new Path(filePath);
if (!fileSystem.mkdirs(dfs)) {
@@ -130,7 +139,7 @@ public class FileSystemUtils {
}
}
- public static boolean fileExist(@NonNull String filePath) throws
IOException {
+ public boolean fileExist(@NonNull String filePath) throws IOException {
FileSystem fileSystem = getFileSystem(filePath);
Path fileName = new Path(filePath);
return fileSystem.exists(fileName);
@@ -139,7 +148,7 @@ public class FileSystemUtils {
/**
* get the dir in filePath
*/
- public static List<Path> dirList(@NonNull String filePath) throws
IOException {
+ public List<Path> dirList(@NonNull String filePath) throws IOException {
FileSystem fileSystem = getFileSystem(filePath);
List<Path> pathList = new ArrayList<>();
if (!fileExist(filePath)) {
@@ -147,7 +156,7 @@ public class FileSystemUtils {
}
Path fileName = new Path(filePath);
FileStatus[] status = fileSystem.listStatus(fileName);
- if (status != null && status.length > 0) {
+ if (status != null) {
for (FileStatus fileStatus : status) {
if (fileStatus.isDirectory()) {
pathList.add(fileStatus.getPath());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index e8e92ce9a..264af8af3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -32,12 +32,11 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import com.google.common.collect.Lists;
-import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -61,12 +60,12 @@ import java.util.stream.Collectors;
public abstract class AbstractWriteStrategy implements WriteStrategy {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
- @Getter
- protected final TextFileSinkConfig textFileSinkConfig;
+ protected final FileSinkConfig fileSinkConfig;
protected final List<Integer> sinkColumnsIndexInRow;
protected String jobId;
protected int subTaskIndex;
protected HadoopConf hadoopConf;
+ protected FileSystemUtils fileSystemUtils;
protected String transactionId;
/**
* The uuid prefix to make sure same job different file sink will not
conflict.
@@ -84,10 +83,10 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
protected int batchSize;
protected int currentBatchSize = 0;
- public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
- this.textFileSinkConfig = textFileSinkConfig;
- this.sinkColumnsIndexInRow =
textFileSinkConfig.getSinkColumnsIndexInRow();
- this.batchSize = textFileSinkConfig.getBatchSize();
+ public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
+ this.fileSinkConfig = fileSinkConfig;
+ this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow();
+ this.batchSize = fileSinkConfig.getBatchSize();
}
/**
@@ -101,7 +100,6 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
this.jobId = jobId;
this.subTaskIndex = subTaskIndex;
this.uuidPrefix = uuidPrefix;
- FileSystemUtils.CONF = getConfiguration(hadoopConf);
}
@Override
@@ -151,14 +149,14 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
*/
@Override
public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow
seaTunnelRow) {
- List<Integer> partitionFieldsIndexInRow =
textFileSinkConfig.getPartitionFieldsIndexInRow();
+ List<Integer> partitionFieldsIndexInRow =
fileSinkConfig.getPartitionFieldsIndexInRow();
Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
partitionDirAndValuesMap.put(BaseSinkConfig.NON_PARTITION, null);
return partitionDirAndValuesMap;
}
- List<String> partitionFieldList =
textFileSinkConfig.getPartitionFieldList();
- String partitionDirExpression =
textFileSinkConfig.getPartitionDirExpression();
+ List<String> partitionFieldList =
fileSinkConfig.getPartitionFieldList();
+ String partitionDirExpression =
fileSinkConfig.getPartitionDirExpression();
String[] keys = new String[partitionFieldList.size()];
String[] values = new String[partitionFieldList.size()];
for (int i = 0; i < partitionFieldList.size(); i++) {
@@ -197,12 +195,12 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
*/
@Override
public String generateFileName(String transactionId) {
- String fileNameExpression = textFileSinkConfig.getFileNameExpression();
- FileFormat fileFormat = textFileSinkConfig.getFileFormat();
+ String fileNameExpression = fileSinkConfig.getFileNameExpression();
+ FileFormat fileFormat = fileSinkConfig.getFileFormat();
if (StringUtils.isBlank(fileNameExpression)) {
return transactionId + fileFormat.getSuffix();
}
- String timeFormat = textFileSinkConfig.getFileNameTimeFormat();
+ String timeFormat = fileSinkConfig.getFileNameTimeFormat();
DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
String formattedDate = df.format(ZonedDateTime.now());
Map<String, String> valuesMap = new HashMap<>();
@@ -242,7 +240,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
*/
public void abortPrepare(String transactionId) {
try {
- FileSystemUtils.deleteFile(getTransactionDir(transactionId));
+ fileSystemUtils.deleteFile(getTransactionDir(transactionId));
} catch (IOException e) {
throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
"Abort transaction " + transactionId + " error, delete
transaction directory failed", e);
@@ -292,7 +290,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
* @return transaction directory
*/
private String getTransactionDir(@NonNull String transactionId) {
- String transactionDirectoryPrefix =
getTransactionDirPrefix(textFileSinkConfig.getTmpPath(), jobId, uuidPrefix);
+ String transactionDirectoryPrefix =
getTransactionDirPrefix(fileSinkConfig.getTmpPath(), jobId, uuidPrefix);
return String.join(File.separator, new
String[]{transactionDirectoryPrefix, transactionId});
}
@@ -321,7 +319,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
public String getTargetLocation(@NonNull String seaTunnelFilePath) {
String tmpPath =
seaTunnelFilePath.replaceAll(Matcher.quoteReplacement(transactionDirectory),
- Matcher.quoteReplacement(textFileSinkConfig.getPath()));
+ Matcher.quoteReplacement(fileSinkConfig.getPath()));
return tmpPath.replaceAll(BaseSinkConfig.NON_PARTITION +
Matcher.quoteReplacement(File.separator), "");
}
@@ -331,7 +329,17 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
}
@Override
- public TextFileSinkConfig getFileSinkConfig() {
- return textFileSinkConfig;
+ public FileSinkConfig getFileSinkConfig() {
+ return fileSinkConfig;
+ }
+
+ @Override
+ public FileSystemUtils getFileSystemUtils() {
+ return fileSystemUtils;
+ }
+
+ @Override
+ public void setFileSystemUtils(FileSystemUtils fileSystemUtils) {
+ this.fileSystemUtils = fileSystemUtils;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 95585606c..9ae10fd35 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -22,8 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import lombok.NonNull;
@@ -39,7 +38,7 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
private final Map<String, Boolean> isFirstWrite;
- public JsonWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+ public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
this.beingWrittenOutputStream = new HashMap<>();
this.isFirstWrite = new HashMap<>();
@@ -94,7 +93,7 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
- fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+ fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
isFirstWrite.put(filePath, true);
} catch (IOException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index a2e164d3b..af5b774e9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import lombok.NonNull;
import org.apache.hadoop.fs.Path;
@@ -62,8 +62,8 @@ import java.util.Map;
public class OrcWriteStrategy extends AbstractWriteStrategy {
private final Map<String, Writer> beingWrittenWriter;
- public OrcWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
- super(textFileSinkConfig);
+ public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
+ super(fileSinkConfig);
this.beingWrittenWriter = new HashMap<>();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 7a23f1b82..6c431605a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import lombok.NonNull;
import org.apache.avro.Conversions;
@@ -78,8 +78,8 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
}
}
- public ParquetWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
- super(textFileSinkConfig);
+ public ParquetWriteStrategy(FileSinkConfig fileSinkConfig) {
+ super(fileSinkConfig);
this.beingWrittenWriter = new HashMap<>();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index d3aa838e2..7a6b6b329 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -25,8 +25,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import lombok.NonNull;
@@ -46,7 +45,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final TimeUtils.Formatter timeFormat;
private SerializationSchema serializationSchema;
- public TextWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+ public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
this.beingWrittenOutputStream = new HashMap<>();
this.isFirstWrite = new HashMap<>();
@@ -112,7 +111,7 @@ public class TextWriteStrategy extends
AbstractWriteStrategy {
FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
- fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+ fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
isFirstWrite.put(filePath, true);
} catch (IOException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 4d716bf58..2b39f38e6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -21,7 +21,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -76,7 +77,27 @@ public interface WriteStrategy extends Transaction,
Serializable {
*/
void finishAndCloseFile();
+ /**
+ * get current checkpoint id
+ * @return checkpoint id
+ */
long getCheckpointId();
- TextFileSinkConfig getFileSinkConfig();
+ /**
+ * get sink configuration
+ * @return sink configuration
+ */
+ FileSinkConfig getFileSinkConfig();
+
+ /**
+ * get file system utils
+ * @return file system utils
+ */
+ FileSystemUtils getFileSystemUtils();
+
+ /**
+ * set file system utils
+ * @param fileSystemUtils fileSystemUtils
+ */
+ void setFileSystemUtils(FileSystemUtils fileSystemUtils);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
index 03388cf58..5c7e46fd5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import lombok.extern.slf4j.Slf4j;
@@ -29,17 +29,17 @@ public class WriteStrategyFactory {
private WriteStrategyFactory() {}
- public static WriteStrategy of(String fileType, TextFileSinkConfig
textFileSinkConfig) {
+ public static WriteStrategy of(String fileType, FileSinkConfig
fileSinkConfig) {
try {
FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
- return fileFormat.getWriteStrategy(textFileSinkConfig);
+ return fileFormat.getWriteStrategy(fileSinkConfig);
} catch (IllegalArgumentException e) {
String errorMsg = String.format("File sink connector not support
this file type [%s], please check your config", fileType);
throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
- public static WriteStrategy of(FileFormat fileFormat, TextFileSinkConfig
textFileSinkConfig) {
- return fileFormat.getWriteStrategy(textFileSinkConfig);
+ public static WriteStrategy of(FileFormat fileFormat, FileSinkConfig
fileSinkConfig) {
+ return fileFormat.getWriteStrategy(fileSinkConfig);
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 01f5a9d27..b6c22e5f3 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.hive.commit;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -38,8 +38,9 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
private final String dbName;
private final String tableName;
- public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName,
String tableName, HadoopConf hadoopConf) {
- super(hadoopConf);
+ public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName,
+ String tableName, FileSystemUtils
fileSystemUtils) {
+ super(fileSystemUtils);
this.pluginConfig = pluginConfig;
this.dbName = dbName;
this.tableName = tableName;
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 0e5a60532..914c2b653 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -130,6 +130,6 @@ public class HiveSink extends BaseHdfsFileSink {
@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
- return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig,
dbName, tableName, hadoopConf));
+ return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig,
dbName, tableName, fileSystemUtils));
}
}
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index 6e8267451..0010c3a5b 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.redshift.commit;
import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
@@ -44,8 +43,8 @@ public class S3RedshiftSinkAggregatedCommitter extends
FileSinkAggregatedCommitt
private Config pluginConfig;
- public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config
pluginConfig) {
- super(hadoopConf);
+ public S3RedshiftSinkAggregatedCommitter(FileSystemUtils fileSystemUtils,
Config pluginConfig) {
+ super(fileSystemUtils);
this.pluginConfig = pluginConfig;
this.executeSql =
pluginConfig.getString(S3RedshiftConfig.EXECUTE_SQL.key());
}
@@ -61,7 +60,7 @@ public class S3RedshiftSinkAggregatedCommitter extends
FileSinkAggregatedCommitt
log.debug("execute redshift sql is:" + sql);
RedshiftJdbcClient.getInstance(pluginConfig).execute(sql);
try {
- FileSystemUtils.deleteFile(tmpFileEntry.getKey());
+ fileSystemUtils.deleteFile(tmpFileEntry.getKey());
} catch (IOException e) {
log.warn("delete tmp file error:" +
tmpFileEntry.getKey());
}
@@ -86,7 +85,7 @@ public class S3RedshiftSinkAggregatedCommitter extends
FileSinkAggregatedCommitt
try {
for (Map.Entry<String, Map<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
// delete the transaction dir
- FileSystemUtils.deleteFile(entry.getKey());
+ fileSystemUtils.deleteFile(entry.getKey());
}
} catch (Exception e) {
log.error("abort aggregatedCommitInfo error ", e);
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
index 06debc295..f2978f7ae 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
@@ -62,6 +62,6 @@ public class S3RedshiftSink extends BaseHdfsFileSink {
@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>> createAggregatedCommitter() {
- return Optional.of(new S3RedshiftSinkAggregatedCommitter(hadoopConf,
pluginConfig));
+ return Optional.of(new
S3RedshiftSinkAggregatedCommitter(fileSystemUtils, pluginConfig));
}
}