This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c5751b001b [Hotfix][Connector-V2][SFTP] Add quote to sftp file names
with wildcard characters (#8501)
c5751b001b is described below
commit c5751b001b57b1f12b6aab6b33a3fbfdb8ced6c0
Author: e-mhui <[email protected]>
AuthorDate: Tue Jan 21 11:03:22 2025 +0800
[Hotfix][Connector-V2][SFTP] Add quote to sftp file names with wildcard
characters (#8501)
---
.../seatunnel/file/sftp/system/SFTPFileSystem.java | 26 ++++-
.../file/sftp/system/SftpFileSystemTest.java | 41 ++++++++
.../e2e/connector/file/fstp/SftpFileIT.java | 14 +++
...ftp_file_text_wildcard_character_to_assert.conf | 117 +++++++++++++++++++++
4 files changed, 197 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
index 83fccdeb3c..99bf417763 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
@@ -40,6 +40,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Vector;
@@ -154,6 +155,29 @@ public class SFTPFileSystem extends FileSystem {
}
}
+ public String quote(String path) {
+ byte[] _path = path.getBytes(StandardCharsets.UTF_8);
+ int count = 0;
+ for (int i = 0; i < _path.length; i++) {
+ byte b = _path[i];
+ if (b == '\\' || b == '?' || b == '*') {
+ count++;
+ }
+ }
+ if (count == 0) {
+ return path;
+ }
+ byte[] _path2 = new byte[_path.length + count];
+ for (int i = 0, j = 0; i < _path.length; i++) {
+ byte b = _path[i];
+ if (b == '\\' || b == '?' || b == '*') {
+ _path2[j++] = '\\';
+ }
+ _path2[j++] = b;
+ }
+ return new String(_path2, 0, _path2.length, StandardCharsets.UTF_8);
+ }
+
/**
* Convenience method, so that we don't open a new connection when using
this method from within
* another method. Otherwise every API invocation incurs the overhead of
opening/closing a TCP
@@ -466,7 +490,7 @@ public class SFTPFileSystem extends FileSystem {
// the path could be a symbolic link, so get the real path
absolute = new Path("/",
channel.realpath(absolute.toUri().getPath()));
- is = channel.get(absolute.toUri().getPath());
+ is = channel.get(quote(absolute.toUri().getPath()));
} catch (SftpException e) {
throw new IOException(e);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SftpFileSystemTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SftpFileSystemTest.java
new file mode 100644
index 0000000000..0e539350b0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SftpFileSystemTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.system;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class SftpFileSystemTest {
+
+ @Test
+ void convertAllTypeFileName() {
+ SFTPFileSystem sftpFileSystem = new SFTPFileSystem();
+ Assertions.assertEquals(
+ "/home/seatunnel/tmp/seatunnel/read/wildcard/e2e.txt",
+
sftpFileSystem.quote("/home/seatunnel/tmp/seatunnel/read/wildcard/e2e.txt"));
+ // test file name with wildcard '*'
+ Assertions.assertEquals(
+ "/home/seatunnel/tmp/seatunnel/read/wildcard/e\\*e.txt",
+
sftpFileSystem.quote("/home/seatunnel/tmp/seatunnel/read/wildcard/e*e.txt"));
+
+ // test file name with wildcard '?'
+ Assertions.assertEquals(
+ "/home/seatunnel/tmp/seatunnel/read/wildcard/e\\?e.txt",
+
sftpFileSystem.quote("/home/seatunnel/tmp/seatunnel/read/wildcard/e?e.txt"));
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
index 2ac185aabb..235f39ae38 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
@@ -113,6 +113,17 @@ public class SftpFileIT extends TestSuiteBase implements
TestResource {
"/home/seatunnel/tmp/seatunnel/read/xml/name=tyrantlucifer/hobby=coding/e2e.xml",
sftpContainer);
+ // Windows does not support files with wildcard characters. We can
rename `e2e.txt` to
+ // `e*e.txt` when copying to a container
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+ "/home/seatunnel/tmp/seatunnel/read/wildcard/e*e.txt",
+ sftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+ "/home/seatunnel/tmp/seatunnel/read/wildcard/e2e.txt",
+ sftpContainer);
sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel
/home/seatunnel/tmp/");
}
@@ -138,6 +149,9 @@ public class SftpFileIT extends TestSuiteBase implements
TestResource {
helper.execute("/text/sftp_file_text_projection_to_assert.conf");
// test read sftp zip text file
helper.execute("/text/sftp_file_zip_text_to_assert.conf");
+ // test read file wit wildcard character, should match
tmp/seatunnel/read/wildcard/e*e.txt
+ // and tmp/seatunnel/read/wildcard/e2e.txt
+
helper.execute("/text/sftp_file_text_wildcard_character_to_assert.conf");
// test write sftp json file
helper.execute("/json/fake_to_sftp_file_json.conf");
// test read sftp json file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_wildcard_character_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_wildcard_character_to_assert.conf
new file mode 100644
index 0000000000..cd8e27b743
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_wildcard_character_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+ path = "tmp/seatunnel/read/wildcard/"
+ file_format_type = "text"
+ plugin_output = "sftp"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "sftp"
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file