This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 86ae9272c4 [Fix][Connector-V2] Fix file binary format sync convert
directory to file (#7942)
86ae9272c4 is described below
commit 86ae9272c42dc0e64b88e02bea397a0974b556f2
Author: Jast <[email protected]>
AuthorDate: Thu Oct 31 13:55:23 2024 +0800
[Fix][Connector-V2] Fix file binary format sync convert directory to file
(#7942)
---
.github/workflows/backend.yml | 2 +-
.../file/hadoop/HadoopFileSystemProxy.java | 4 ++
.../file/source/reader/BinaryReadStrategy.java | 2 +-
.../e2e/connector/file/ftp/FtpFileIT.java | 19 +++++++++
.../test/resources/text/ftp_to_ftp_for_binary.conf | 47 ++++++++++++++++++++++
5 files changed, 72 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 39efd998da..67b2511f10 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -364,7 +364,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 120
+ timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index d4f1791a4e..8f8aef0548 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -64,6 +64,10 @@ public class HadoopFileSystemProxy implements Serializable,
Closeable {
return execute(() -> getFileSystem().exists(new Path(filePath)));
}
+ public boolean isFile(@NonNull String filePath) throws IOException {
+ return execute(() -> getFileSystem().getFileStatus(new
Path(filePath)).isFile());
+ }
+
public void createFile(@NonNull String filePath) throws IOException {
execute(
() -> {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
index 3bbb90c774..7849415b32 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
@@ -55,7 +55,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy {
throws IOException, FileConnectorException {
try (InputStream inputStream =
hadoopFileSystemProxy.getInputStream(path)) {
String relativePath;
- if (basePath.isFile()) {
+ if (hadoopFileSystemProxy.isFile(basePath.getAbsolutePath())) {
relativePath = basePath.getName();
} else {
relativePath =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 7b73c3ee70..26c32f247c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -187,6 +187,25 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
deleteFileFromContainer(homePath);
}
+ @TestTemplate
+ public void testFtpToFtpForBinary(TestContainer container)
+ throws IOException, InterruptedException {
+
+ Container.ExecResult execResult =
container.executeJob("/text/ftp_to_ftp_for_binary.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String homePath = "/home/vsftpd/seatunnel/uploads/seatunnel";
+ Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
+
+ // Confirm data is written correctly
+ Container.ExecResult resultExecResult =
+ ftpContainer.execInContainer(
+ "sh", "-c", "awk 'END {print NR}' " + homePath +
"/e2e.txt");
+ Assertions.assertEquals("5", resultExecResult.getStdout().trim());
+
+ deleteFileFromContainer(homePath);
+ }
+
private void assertJobExecution(TestContainer container, String
configPath, List<String> params)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob(configPath,
params);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf
new file mode 100644
index 0000000000..f8b8b92cd5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path= "/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"
+ file_format_type= "binary"
+ encoding = "UTF-8"
+ }
+}
+
+
+sink {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ tmp_path = "/upload-tmp/seatunnel"
+ path= "/uploads/seatunnel"
+ file_format_type= "binary"
+ encoding="UTF-8"
+ }
+}
\ No newline at end of file