This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1f58f224a0 [Feature][Connector-V2][File] Support read empty directory
(#5591)
1f58f224a0 is described below
commit 1f58f224a0c10df8ce9b719a4c1f42de866e9581
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Oct 11 10:57:19 2023 +0800
[Feature][Connector-V2][File] Support read empty directory (#5591)
---
.../file/hdfs/source/BaseHdfsFileSource.java | 5 +++
.../file/source/reader/AbstractReadStrategy.java | 10 ------
.../seatunnel/file/cos/source/CosFileSource.java | 5 +++
.../seatunnel/file/ftp/source/FtpFileSource.java | 5 +++
.../seatunnel/file/oss/source/OssFileSource.java | 5 +++
.../file/local/source/LocalFileSource.java | 5 +++
.../seatunnel/file/oss/source/OssFileSource.java | 5 +++
.../seatunnel/file/s3/source/S3FileSource.java | 5 +++
.../seatunnel/file/sftp/source/SftpFileSource.java | 5 +++
.../e2e/connector/file/local/LocalFileIT.java | 5 +++
.../test/resources/json/local_file_to_console.conf | 37 ++++++++++++++++++++++
.../resources/parquet/local_file_to_console.conf | 37 ++++++++++++++++++++++
12 files changed, 119 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 987b861d3f..60812cd6cf 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -111,6 +111,11 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index e4e1694f30..7de0a242cc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -24,8 +24,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -154,14 +152,6 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
}
- if (fileNames.isEmpty()) {
- throw new FileConnectorException(
- FileConnectorErrorCode.FILE_LIST_EMPTY,
- "The target file list is empty,"
- + "SeaTunnel will not be able to sync empty table,
"
- + "please check the configuration parameters such
as: [file_filter_pattern]");
- }
-
return fileNames;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
index f874666c63..ec2c65eede 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
@@ -107,6 +107,11 @@ public class CosFileSource extends BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index e396e2834e..6abd33fb4b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -112,6 +112,11 @@ public class FtpFileSource extends BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 7714e62bf3..b3a9d1593b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -108,6 +108,11 @@ public class OssFileSource extends BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index d7ba8d7224..7a8c5a6cc7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -109,6 +109,11 @@ public class LocalFileSource extends BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 3f8144c7d3..e5d7fba773 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -107,6 +107,11 @@ public class OssFileSource extends BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index fb3eb848a1..63e3af6221 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -104,6 +104,11 @@ public class S3FileSource extends BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index e7d2c86f5f..0d195da073 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -112,6 +112,11 @@ public class SftpFileSource extends BaseFileSource {
"SeaTunnel does not supported this file format");
}
} else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior
schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
} catch (FileConnectorException e) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index aed3576726..c454c6ce2c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -69,6 +69,7 @@ public class LocalFileIT extends TestSuiteBase {
"/excel/e2e.xlsx",
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
container);
+ container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
};
@TestTemplate
@@ -105,5 +106,9 @@ public class LocalFileIT extends TestSuiteBase {
helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
// test read filtered local file
helper.execute("/excel/local_filter_excel_to_assert.conf");
+
+ // test read empty directory
+ helper.execute("/json/local_file_to_console.conf");
+ helper.execute("/parquet/local_file_to_console.conf");
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
new file mode 100644
index 0000000000..4595f83888
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/tmp/fake_empty"
+ file_format_type = "json"
+ }
+}
+
+sink {
+ Console {}
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf
new file mode 100644
index 0000000000..ee3bff3fb9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/tmp/fake_empty"
+ file_format_type = "parquet"
+ }
+}
+
+sink {
+ Console {}
+}
\ No newline at end of file