This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 86ae9272c4 [Fix][Connector-V2] Fix file binary format sync convert 
directory to file (#7942)
86ae9272c4 is described below

commit 86ae9272c42dc0e64b88e02bea397a0974b556f2
Author: Jast <[email protected]>
AuthorDate: Thu Oct 31 13:55:23 2024 +0800

    [Fix][Connector-V2] Fix file binary format sync convert directory to file 
(#7942)
---
 .github/workflows/backend.yml                      |  2 +-
 .../file/hadoop/HadoopFileSystemProxy.java         |  4 ++
 .../file/source/reader/BinaryReadStrategy.java     |  2 +-
 .../e2e/connector/file/ftp/FtpFileIT.java          | 19 +++++++++
 .../test/resources/text/ftp_to_ftp_for_binary.conf | 47 ++++++++++++++++++++++
 5 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 39efd998da..67b2511f10 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -364,7 +364,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 120
+    timeout-minutes: 180
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index d4f1791a4e..8f8aef0548 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -64,6 +64,10 @@ public class HadoopFileSystemProxy implements Serializable, 
Closeable {
         return execute(() -> getFileSystem().exists(new Path(filePath)));
     }
 
+    public boolean isFile(@NonNull String filePath) throws IOException {
+        return execute(() -> getFileSystem().getFileStatus(new 
Path(filePath)).isFile());
+    }
+
     public void createFile(@NonNull String filePath) throws IOException {
         execute(
                 () -> {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
index 3bbb90c774..7849415b32 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
@@ -55,7 +55,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy {
             throws IOException, FileConnectorException {
         try (InputStream inputStream = 
hadoopFileSystemProxy.getInputStream(path)) {
             String relativePath;
-            if (basePath.isFile()) {
+            if (hadoopFileSystemProxy.isFile(basePath.getAbsolutePath())) {
                 relativePath = basePath.getName();
             } else {
                 relativePath =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 7b73c3ee70..26c32f247c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -187,6 +187,25 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         deleteFileFromContainer(homePath);
     }
 
+    @TestTemplate
+    public void testFtpToFtpForBinary(TestContainer container)
+            throws IOException, InterruptedException {
+
+        Container.ExecResult execResult = 
container.executeJob("/text/ftp_to_ftp_for_binary.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        String homePath = "/home/vsftpd/seatunnel/uploads/seatunnel";
+        Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
+
+        // Confirm data is written correctly
+        Container.ExecResult resultExecResult =
+                ftpContainer.execInContainer(
+                        "sh", "-c", "awk 'END {print NR}' " + homePath + 
"/e2e.txt");
+        Assertions.assertEquals("5", resultExecResult.getStdout().trim());
+
+        deleteFileFromContainer(homePath);
+    }
+
     private void assertJobExecution(TestContainer container, String 
configPath, List<String> params)
             throws IOException, InterruptedException {
         Container.ExecResult execResult = container.executeJob(configPath, 
params);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf
new file mode 100644
index 0000000000..f8b8b92cd5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+    path= "/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"
+    file_format_type= "binary"
+    encoding = "UTF-8"
+  }
+}
+
+
+sink {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+    tmp_path = "/upload-tmp/seatunnel"
+    path= "/uploads/seatunnel"
+    file_format_type= "binary"
+    encoding="UTF-8"
+  }
+}
\ No newline at end of file

Reply via email to