This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1f58f224a0 [Feature][Connector-V2][File] Support read empty directory 
(#5591)
1f58f224a0 is described below

commit 1f58f224a0c10df8ce9b719a4c1f42de866e9581
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Oct 11 10:57:19 2023 +0800

    [Feature][Connector-V2][File] Support read empty directory (#5591)
---
 .../file/hdfs/source/BaseHdfsFileSource.java       |  5 +++
 .../file/source/reader/AbstractReadStrategy.java   | 10 ------
 .../seatunnel/file/cos/source/CosFileSource.java   |  5 +++
 .../seatunnel/file/ftp/source/FtpFileSource.java   |  5 +++
 .../seatunnel/file/oss/source/OssFileSource.java   |  5 +++
 .../file/local/source/LocalFileSource.java         |  5 +++
 .../seatunnel/file/oss/source/OssFileSource.java   |  5 +++
 .../seatunnel/file/s3/source/S3FileSource.java     |  5 +++
 .../seatunnel/file/sftp/source/SftpFileSource.java |  5 +++
 .../e2e/connector/file/local/LocalFileIT.java      |  5 +++
 .../test/resources/json/local_file_to_console.conf | 37 ++++++++++++++++++++++
 .../resources/parquet/local_file_to_console.conf   | 37 ++++++++++++++++++++++
 12 files changed, 119 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 987b861d3f..60812cd6cf 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -111,6 +111,11 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index e4e1694f30..7de0a242cc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -24,8 +24,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -154,14 +152,6 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
             }
         }
 
-        if (fileNames.isEmpty()) {
-            throw new FileConnectorException(
-                    FileConnectorErrorCode.FILE_LIST_EMPTY,
-                    "The target file list is empty,"
-                            + "SeaTunnel will not be able to sync empty table, 
"
-                            + "please check the configuration parameters such 
as: [file_filter_pattern]");
-        }
-
         return fileNames;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
index f874666c63..ec2c65eede 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
@@ -107,6 +107,11 @@ public class CosFileSource extends BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index e396e2834e..6abd33fb4b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -112,6 +112,11 @@ public class FtpFileSource extends BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 7714e62bf3..b3a9d1593b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -108,6 +108,11 @@ public class OssFileSource extends BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index d7ba8d7224..7a8c5a6cc7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -109,6 +109,11 @@ public class LocalFileSource extends BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 3f8144c7d3..e5d7fba773 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -107,6 +107,11 @@ public class OssFileSource extends BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index fb3eb848a1..63e3af6221 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -104,6 +104,11 @@ public class S3FileSource extends BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index e7d2c86f5f..0d195da073 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -112,6 +112,11 @@ public class SftpFileSource extends BaseFileSource {
                             "SeaTunnel does not supported this file format");
             }
         } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
             } catch (FileConnectorException e) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index aed3576726..c454c6ce2c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -69,6 +69,7 @@ public class LocalFileIT extends TestSuiteBase {
                         "/excel/e2e.xlsx",
                         
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
                         container);
+                container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
             };
 
     @TestTemplate
@@ -105,5 +106,9 @@ public class LocalFileIT extends TestSuiteBase {
         
helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
         // test read filtered local file
         helper.execute("/excel/local_filter_excel_to_assert.conf");
+
+        // test read empty directory
+        helper.execute("/json/local_file_to_console.conf");
+        helper.execute("/parquet/local_file_to_console.conf");
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
new file mode 100644
index 0000000000..4595f83888
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/tmp/fake_empty"
+    file_format_type = "json"
+  }
+}
+
+sink {
+  Console {}
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf
new file mode 100644
index 0000000000..ee3bff3fb9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/tmp/fake_empty"
+    file_format_type = "parquet"
+  }
+}
+
+sink {
+  Console {}
+}
\ No newline at end of file

Reply via email to