This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dd5c35388 [Improve][Connector-V2][File] Improve code structure (#3238)
dd5c35388 is described below

commit dd5c353881eddaeb0fb0b3206429ab74b4aeb789
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Nov 9 17:22:26 2022 +0800

    [Improve][Connector-V2][File] Improve code structure (#3238)
    
    * [Improve][Connector-V2][File] Improve code structure
    
    * [Improve][Connector-V2][File] Fix unit tests
    
    * [Improve][Connector-V2][File] Fix NPE
    
    * [Feature][Connector-V2][Hive] Fix file e2e
    
    * [Hotfix][Connector-V2][File] Fix file e2e
---
 .../seatunnel/file/config/HadoopConf.java          | 11 ++++++++-
 .../file/sink/writer/AbstractWriteStrategy.java    | 12 ++++------
 .../file/source/reader/AbstractReadStrategy.java   | 11 +++------
 .../file/source/reader/ParquetReadStrategy.java    |  2 +-
 .../seatunnel/file/writer/OrcReadStrategyTest.java | 24 ++++++++++++++++++-
 .../file/writer/ParquetReadStrategyTest.java       | 27 ++++++++++++++++++++--
 .../seatunnel/file/ftp/config/FtpConf.java         | 11 ++++++---
 .../seatunnel/file/local/config/LocalConf.java     | 13 ++++++++++-
 .../seatunnel/file/oss/config/OssConf.java         | 10 ++++++--
 .../seatunnel/file/s3/config/S3Conf.java           | 11 ++++++---
 .../seatunnel/file/sftp/config/SftpConf.java       | 13 ++++++++++-
 11 files changed, 115 insertions(+), 30 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index 6da660421..2c59d46ee 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -26,14 +26,23 @@ import java.util.Map;
 
 @Data
 public class HadoopConf implements Serializable {
+    private static final String HDFS_IMPL = 
"org.apache.hadoop.hdfs.DistributedFileSystem";
+    private static final String SCHEMA = "hdfs";
     protected Map<String, String> extraOptions = new HashMap<>();
     protected String hdfsNameKey;
-    protected String fsHdfsImpl = 
"org.apache.hadoop.hdfs.DistributedFileSystem";
 
     public HadoopConf(String hdfsNameKey) {
         this.hdfsNameKey = hdfsNameKey;
     }
 
+    public String getFsHdfsImpl() {
+        return HDFS_IMPL;
+    }
+
+    public String getSchema() {
+        return SCHEMA;
+    }
+
     public void setExtraOptionsForConfiguration(Configuration configuration) {
         if (!extraOptions.isEmpty()) {
             extraOptions.forEach(configuration::set);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index b154c7cbc..524c01593 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -94,21 +94,19 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     /**
      * use hadoop conf generate hadoop configuration
      *
-     * @param conf hadoop conf
+     * @param hadoopConf hadoop conf
      * @return Configuration
      */
     @Override
-    public Configuration getConfiguration(HadoopConf conf) {
+    public Configuration getConfiguration(HadoopConf hadoopConf) {
         Configuration configuration = new Configuration();
         configuration.setBoolean(READ_INT96_AS_FIXED, true);
         configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
         configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
         configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
-        if (hadoopConf != null) {
-            configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
-            configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
-            hadoopConf.setExtraOptionsForConfiguration(configuration);
-        }
+        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
+        configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), 
hadoopConf.getFsHdfsImpl());
+        this.hadoopConf.setExtraOptionsForConfiguration(configuration);
         return configuration;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index bdcd2e029..f8288a71f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -87,18 +87,13 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
         configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
         configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
-        if (hadoopConf != null) {
-            configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
-            configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
-            hadoopConf.setExtraOptionsForConfiguration(configuration);
-        }
+        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
+        configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), 
hadoopConf.getFsHdfsImpl());
+        hadoopConf.setExtraOptionsForConfiguration(configuration);
         return configuration;
     }
 
     Configuration getConfiguration() throws FilePluginException {
-        if (null == hadoopConf) {
-            log.info("Local file reader didn't need hadoopConf");
-        }
         return getConfiguration(hadoopConf);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 52df095a4..212f79821 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -202,7 +202,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
         Path filePath = new Path(path);
         ParquetMetadata metadata;
         try {
-            HadoopInputFile hadoopInputFile = 
HadoopInputFile.fromPath(filePath, getConfiguration());
+            HadoopInputFile hadoopInputFile = 
HadoopInputFile.fromPath(filePath, getConfiguration(hadoopConf));
             ParquetFileReader reader = ParquetFileReader.open(hadoopInputFile);
             metadata = reader.getFooter();
             reader.close();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
index dc32fbe22..454bf907f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.writer;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
 
 import org.junit.jupiter.api.Assertions;
@@ -35,7 +38,8 @@ public class OrcReadStrategyTest {
         assert resource != null;
         String path = Paths.get(resource.toURI()).toString();
         OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
-        orcReadStrategy.init(null);
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        orcReadStrategy.init(localConf);
         TestCollector testCollector = new TestCollector();
         orcReadStrategy.read(path, testCollector);
     }
@@ -55,4 +59,22 @@ public class OrcReadStrategyTest {
         }
     }
 
+    public static class LocalConf extends HadoopConf {
+        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
+        private static final String SCHEMA = "file";
+
+        public LocalConf(String hdfsNameKey) {
+            super(hdfsNameKey);
+        }
+
+        @Override
+        public String getFsHdfsImpl() {
+            return HDFS_IMPL;
+        }
+
+        @Override
+        public String getSchema() {
+            return SCHEMA;
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index c86d54e61..acb8cf38d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.writer;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
 
 import org.junit.jupiter.api.Test;
@@ -34,8 +37,9 @@ public class ParquetReadStrategyTest {
         assert resource != null;
         String path = Paths.get(resource.toURI()).toString();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        parquetReadStrategy.init(null);
-        SeaTunnelRowType seaTunnelRowTypeInfo = 
parquetReadStrategy.getSeaTunnelRowTypeInfo(null, path);
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+        SeaTunnelRowType seaTunnelRowTypeInfo = 
parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
         assert seaTunnelRowTypeInfo != null;
         TestCollector testCollector = new TestCollector();
         parquetReadStrategy.read(path, testCollector);
@@ -54,4 +58,23 @@ public class ParquetReadStrategyTest {
             return null;
         }
     }
+
+    public static class LocalConf extends HadoopConf {
+        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
+        private static final String SCHEMA = "file";
+
+        public LocalConf(String hdfsNameKey) {
+            super(hdfsNameKey);
+        }
+
+        @Override
+        public String getFsHdfsImpl() {
+            return HDFS_IMPL;
+        }
+
+        @Override
+        public String getSchema() {
+            return SCHEMA;
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
index add437bcd..300532ac0 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
@@ -24,7 +24,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import java.util.HashMap;
 
 public class FtpConf extends HadoopConf {
-    protected String fsHdfsImpl = 
"org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem";
+    private static final String HDFS_IMPL = 
"org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem";
+    private static final String SCHEMA = "ftp";
 
     private FtpConf(String hdfsNameKey) {
         super(hdfsNameKey);
@@ -32,7 +33,12 @@ public class FtpConf extends HadoopConf {
 
     @Override
     public String getFsHdfsImpl() {
-        return fsHdfsImpl;
+        return HDFS_IMPL;
+    }
+
+    @Override
+    public String getSchema() {
+        return SCHEMA;
     }
 
     public static HadoopConf buildWithConfig(Config config) {
@@ -43,7 +49,6 @@ public class FtpConf extends HadoopConf {
         HashMap<String, String> ftpOptions = new HashMap<>();
         ftpOptions.put("fs.ftp.user." + host, 
config.getString(FtpConfig.FTP_USERNAME));
         ftpOptions.put("fs.ftp.password." + host, 
config.getString(FtpConfig.FTP_PASSWORD));
-        ftpOptions.put("fs.ftp.impl", hadoopConf.getFsHdfsImpl());
         hadoopConf.setExtraOptions(ftpOptions);
         return hadoopConf;
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
index 918b8e486..b1f3f3582 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
@@ -20,9 +20,20 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.local.config;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 
 public class LocalConf extends HadoopConf {
-    private final String fsHdfsImpl = "org.apache.hadoop.fs.LocalFileSystem";
+    private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
+    private static final String SCHEMA = "file";
 
     public LocalConf(String hdfsNameKey) {
         super(hdfsNameKey);
     }
+
+    @Override
+    public String getFsHdfsImpl() {
+        return HDFS_IMPL;
+    }
+
+    @Override
+    public String getSchema() {
+        return SCHEMA;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
index 96fa483f2..ac774ec6e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
@@ -26,11 +26,17 @@ import org.apache.hadoop.fs.aliyun.oss.Constants;
 import java.util.HashMap;
 
 public class OssConf extends HadoopConf {
-    private final String fsHdfsImpl = 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
+    private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
+    private static final String SCHEMA = "oss";
 
     @Override
     public String getFsHdfsImpl() {
-        return fsHdfsImpl;
+        return HDFS_IMPL;
+    }
+
+    @Override
+    public String getSchema() {
+        return SCHEMA;
     }
 
     public OssConf(String hdfsNameKey) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
index 728ff14fa..1fcc354e1 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
@@ -24,11 +24,17 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 import java.util.HashMap;
 
 public class S3Conf extends HadoopConf {
-    private final String fsHdfsImpl = 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
+    private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
+    private static final String SCHEMA = "s3n";
 
     @Override
     public String getFsHdfsImpl() {
-        return fsHdfsImpl;
+        return HDFS_IMPL;
+    }
+
+    @Override
+    public String getSchema() {
+        return SCHEMA;
     }
 
     private S3Conf(String hdfsNameKey) {
@@ -40,7 +46,6 @@ public class S3Conf extends HadoopConf {
         HashMap<String, String> s3Options = new HashMap<>();
         s3Options.put("fs.s3n.awsAccessKeyId", 
config.getString(S3Config.S3_ACCESS_KEY));
         s3Options.put("fs.s3n.awsSecretAccessKey", 
config.getString(S3Config.S3_SECRET_KEY));
-        s3Options.put("fs.s3n.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");
         hadoopConf.setExtraOptions(s3Options);
         return hadoopConf;
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
index b210af878..5e9501b04 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
@@ -24,11 +24,23 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 import java.util.HashMap;
 
 public class SftpConf extends HadoopConf {
+    private static final String HDFS_IMPL = 
"org.apache.seatunnel.connectors.seatunnel.file.sftp.system.SFTPFileSystem";
+    private static final String SCHEMA = "sftp";
 
     private SftpConf(String hdfsNameKey) {
         super(hdfsNameKey);
     }
 
+    @Override
+    public String getFsHdfsImpl() {
+        return HDFS_IMPL;
+    }
+
+    @Override
+    public String getSchema() {
+        return SCHEMA;
+    }
+
     public static HadoopConf buildWithConfig(Config config) {
         String host = config.getString(SftpConfig.SFTP_HOST);
         int port = config.getInt(SftpConfig.SFTP_PORT);
@@ -37,7 +49,6 @@ public class SftpConf extends HadoopConf {
         HashMap<String, String> sftpOptions = new HashMap<>();
         sftpOptions.put("fs.sftp.user." + host, 
config.getString(SftpConfig.SFTP_USERNAME));
         sftpOptions.put("fs.sftp.password." + host + "." + 
config.getString(SftpConfig.SFTP_USERNAME), 
config.getString(SftpConfig.SFTP_PASSWORD));
-        sftpOptions.put("fs.sftp.impl", 
"org.apache.seatunnel.connectors.seatunnel.file.sftp.system.SFTPFileSystem");
         hadoopConf.setExtraOptions(sftpOptions);
         return hadoopConf;
     }

Reply via email to