This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dd5c35388 [Improve][Connector-V2][File] Improve code structure (#3238)
dd5c35388 is described below
commit dd5c353881eddaeb0fb0b3206429ab74b4aeb789
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Nov 9 17:22:26 2022 +0800
[Improve][Connector-V2][File] Improve code structure (#3238)
* [Improve][Connector-V2][File] Improve code structure
* [Improve][Connector-V2][File] Fix unit tests
* [Improve][Connector-V2][File] Fix NPE
* [Feature][Connector-V2][Hive] Fix file e2e
* [Hotfix][Connector-V2][File] Fix file e2e
---
.../seatunnel/file/config/HadoopConf.java | 11 ++++++++-
.../file/sink/writer/AbstractWriteStrategy.java | 12 ++++------
.../file/source/reader/AbstractReadStrategy.java | 11 +++------
.../file/source/reader/ParquetReadStrategy.java | 2 +-
.../seatunnel/file/writer/OrcReadStrategyTest.java | 24 ++++++++++++++++++-
.../file/writer/ParquetReadStrategyTest.java | 27 ++++++++++++++++++++--
.../seatunnel/file/ftp/config/FtpConf.java | 11 ++++++---
.../seatunnel/file/local/config/LocalConf.java | 13 ++++++++++-
.../seatunnel/file/oss/config/OssConf.java | 10 ++++++--
.../seatunnel/file/s3/config/S3Conf.java | 11 ++++++---
.../seatunnel/file/sftp/config/SftpConf.java | 13 ++++++++++-
11 files changed, 115 insertions(+), 30 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index 6da660421..2c59d46ee 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -26,14 +26,23 @@ import java.util.Map;
@Data
public class HadoopConf implements Serializable {
+ private static final String HDFS_IMPL =
"org.apache.hadoop.hdfs.DistributedFileSystem";
+ private static final String SCHEMA = "hdfs";
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
- protected String fsHdfsImpl =
"org.apache.hadoop.hdfs.DistributedFileSystem";
public HadoopConf(String hdfsNameKey) {
this.hdfsNameKey = hdfsNameKey;
}
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ public String getSchema() {
+ return SCHEMA;
+ }
+
public void setExtraOptionsForConfiguration(Configuration configuration) {
if (!extraOptions.isEmpty()) {
extraOptions.forEach(configuration::set);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index b154c7cbc..524c01593 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -94,21 +94,19 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
/**
* use hadoop conf generate hadoop configuration
*
- * @param conf hadoop conf
+ * @param hadoopConf hadoop conf
* @return Configuration
*/
@Override
- public Configuration getConfiguration(HadoopConf conf) {
+ public Configuration getConfiguration(HadoopConf hadoopConf) {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
- if (hadoopConf != null) {
- configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
- configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
- hadoopConf.setExtraOptionsForConfiguration(configuration);
- }
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
+ configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()),
hadoopConf.getFsHdfsImpl());
+ this.hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index bdcd2e029..f8288a71f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -87,18 +87,13 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
- if (hadoopConf != null) {
- configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
- configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
- hadoopConf.setExtraOptionsForConfiguration(configuration);
- }
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
+ configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()),
hadoopConf.getFsHdfsImpl());
+ hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
Configuration getConfiguration() throws FilePluginException {
- if (null == hadoopConf) {
- log.info("Local file reader didn't need hadoopConf");
- }
return getConfiguration(hadoopConf);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 52df095a4..212f79821 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -202,7 +202,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
Path filePath = new Path(path);
ParquetMetadata metadata;
try {
- HadoopInputFile hadoopInputFile =
HadoopInputFile.fromPath(filePath, getConfiguration());
+ HadoopInputFile hadoopInputFile =
HadoopInputFile.fromPath(filePath, getConfiguration(hadoopConf));
ParquetFileReader reader = ParquetFileReader.open(hadoopInputFile);
metadata = reader.getFooter();
reader.close();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
index dc32fbe22..454bf907f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.file.writer;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
import org.junit.jupiter.api.Assertions;
@@ -35,7 +38,8 @@ public class OrcReadStrategyTest {
assert resource != null;
String path = Paths.get(resource.toURI()).toString();
OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
- orcReadStrategy.init(null);
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ orcReadStrategy.init(localConf);
TestCollector testCollector = new TestCollector();
orcReadStrategy.read(path, testCollector);
}
@@ -55,4 +59,22 @@ public class OrcReadStrategyTest {
}
}
+ public static class LocalConf extends HadoopConf {
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
+ private static final String SCHEMA = "file";
+
+ public LocalConf(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index c86d54e61..acb8cf38d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.file.writer;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
import org.junit.jupiter.api.Test;
@@ -34,8 +37,9 @@ public class ParquetReadStrategyTest {
assert resource != null;
String path = Paths.get(resource.toURI()).toString();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- parquetReadStrategy.init(null);
- SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(null, path);
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
assert seaTunnelRowTypeInfo != null;
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
@@ -54,4 +58,23 @@ public class ParquetReadStrategyTest {
return null;
}
}
+
+ public static class LocalConf extends HadoopConf {
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
+ private static final String SCHEMA = "file";
+
+ public LocalConf(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
index add437bcd..300532ac0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
@@ -24,7 +24,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.util.HashMap;
public class FtpConf extends HadoopConf {
- protected String fsHdfsImpl =
"org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem";
+ private static final String HDFS_IMPL =
"org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem";
+ private static final String SCHEMA = "ftp";
private FtpConf(String hdfsNameKey) {
super(hdfsNameKey);
@@ -32,7 +33,12 @@ public class FtpConf extends HadoopConf {
@Override
public String getFsHdfsImpl() {
- return fsHdfsImpl;
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
}
public static HadoopConf buildWithConfig(Config config) {
@@ -43,7 +49,6 @@ public class FtpConf extends HadoopConf {
HashMap<String, String> ftpOptions = new HashMap<>();
ftpOptions.put("fs.ftp.user." + host,
config.getString(FtpConfig.FTP_USERNAME));
ftpOptions.put("fs.ftp.password." + host,
config.getString(FtpConfig.FTP_PASSWORD));
- ftpOptions.put("fs.ftp.impl", hadoopConf.getFsHdfsImpl());
hadoopConf.setExtraOptions(ftpOptions);
return hadoopConf;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
index 918b8e486..b1f3f3582 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
@@ -20,9 +20,20 @@ package
org.apache.seatunnel.connectors.seatunnel.file.local.config;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
public class LocalConf extends HadoopConf {
- private final String fsHdfsImpl = "org.apache.hadoop.fs.LocalFileSystem";
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
+ private static final String SCHEMA = "file";
public LocalConf(String hdfsNameKey) {
super(hdfsNameKey);
}
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
index 96fa483f2..ac774ec6e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
@@ -26,11 +26,17 @@ import org.apache.hadoop.fs.aliyun.oss.Constants;
import java.util.HashMap;
public class OssConf extends HadoopConf {
- private final String fsHdfsImpl =
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
+ private static final String SCHEMA = "oss";
@Override
public String getFsHdfsImpl() {
- return fsHdfsImpl;
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
}
public OssConf(String hdfsNameKey) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
index 728ff14fa..1fcc354e1 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
@@ -24,11 +24,17 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.util.HashMap;
public class S3Conf extends HadoopConf {
- private final String fsHdfsImpl =
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
+ private static final String SCHEMA = "s3n";
@Override
public String getFsHdfsImpl() {
- return fsHdfsImpl;
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
}
private S3Conf(String hdfsNameKey) {
@@ -40,7 +46,6 @@ public class S3Conf extends HadoopConf {
HashMap<String, String> s3Options = new HashMap<>();
s3Options.put("fs.s3n.awsAccessKeyId",
config.getString(S3Config.S3_ACCESS_KEY));
s3Options.put("fs.s3n.awsSecretAccessKey",
config.getString(S3Config.S3_SECRET_KEY));
- s3Options.put("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConf.setExtraOptions(s3Options);
return hadoopConf;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
index b210af878..5e9501b04 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
@@ -24,11 +24,23 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.util.HashMap;
public class SftpConf extends HadoopConf {
+ private static final String HDFS_IMPL =
"org.apache.seatunnel.connectors.seatunnel.file.sftp.system.SFTPFileSystem";
+ private static final String SCHEMA = "sftp";
private SftpConf(String hdfsNameKey) {
super(hdfsNameKey);
}
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+
public static HadoopConf buildWithConfig(Config config) {
String host = config.getString(SftpConfig.SFTP_HOST);
int port = config.getInt(SftpConfig.SFTP_PORT);
@@ -37,7 +49,6 @@ public class SftpConf extends HadoopConf {
HashMap<String, String> sftpOptions = new HashMap<>();
sftpOptions.put("fs.sftp.user." + host,
config.getString(SftpConfig.SFTP_USERNAME));
sftpOptions.put("fs.sftp.password." + host + "." +
config.getString(SftpConfig.SFTP_USERNAME),
config.getString(SftpConfig.SFTP_PASSWORD));
- sftpOptions.put("fs.sftp.impl",
"org.apache.seatunnel.connectors.seatunnel.file.sftp.system.SFTPFileSystem");
hadoopConf.setExtraOptions(sftpOptions);
return hadoopConf;
}