This is an automated email from the ASF dual-hosted git repository.

bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e372e17eed Add SSH/SFTP support for ObjectStoragePath (#60757)
8e372e17eed is described below

commit 8e372e17eed89f3e96e107930a506c9f288f40f8
Author: anishgirianish <[email protected]>
AuthorDate: Sun Jan 25 08:26:46 2026 -0600

    Add SSH/SFTP support for ObjectStoragePath (#60757)
    
    * 60558 (feature) : added ssh sftp object store
    
    * fix depreciation warning
    
    * documentation clean ups
    
    * clean ups
---
 providers/sftp/docs/filesystems/index.rst          |  26 +++
 providers/sftp/docs/filesystems/sftp.rst           |  63 ++++++
 providers/sftp/docs/index.rst                      |   7 +
 providers/sftp/provider.yaml                       |   3 +
 providers/sftp/pyproject.toml                      |   3 +
 .../sftp/src/airflow/providers/sftp/fs/__init__.py |  16 ++
 .../sftp/src/airflow/providers/sftp/fs/sftp.py     |  65 ++++++
 .../airflow/providers/sftp/get_provider_info.py    |   1 +
 providers/sftp/tests/unit/sftp/fs/__init__.py      |  16 ++
 providers/sftp/tests/unit/sftp/fs/test_sftp.py     | 222 +++++++++++++++++++++
 10 files changed, 422 insertions(+)

diff --git a/providers/sftp/docs/filesystems/index.rst 
b/providers/sftp/docs/filesystems/index.rst
new file mode 100644
index 00000000000..eb0036177a3
--- /dev/null
+++ b/providers/sftp/docs/filesystems/index.rst
@@ -0,0 +1,26 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Filesystems
+===========
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Filesystem Providers
+    :glob:
+
+    *
diff --git a/providers/sftp/docs/filesystems/sftp.rst 
b/providers/sftp/docs/filesystems/sftp.rst
new file mode 100644
index 00000000000..5b1e402d6ce
--- /dev/null
+++ b/providers/sftp/docs/filesystems/sftp.rst
@@ -0,0 +1,63 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+SFTP Filesystem
+===============
+
+Use ``ObjectStoragePath`` with SFTP/SSH servers via the `sshfs 
<https://github.com/fsspec/sshfs>`__ library.
+
+.. code-block:: bash
+
+    pip install apache-airflow-providers-sftp[sshfs]
+
+URL format: ``sftp://connection_id@hostname/path/to/file`` (also supports 
``ssh://``).
+
+Configuration
+-------------
+
+Uses the standard SFTP connection. The following extras are supported:
+
+* ``key_file`` - path to private key file
+* ``private_key`` - private key content (PEM format)
+* ``private_key_passphrase`` - passphrase for encrypted keys
+* ``no_host_key_check`` - set to ``true`` to skip host key verification
+
+See :doc:`/connections/sftp` for details.
+
+Example
+-------
+
+.. code-block:: python
+
+    from airflow.sdk import ObjectStoragePath
+
+    path = ObjectStoragePath("sftp://my_conn@myserver/data/file.csv";)
+
+    # read
+    with path.open() as f:
+        data = f.read()
+
+    # write
+    with path.open("w") as f:
+        f.write("content")
+
+    # list
+    for p in path.parent.iterdir():
+        print(p.name)
+
+    # copy
+    path.copy(ObjectStoragePath("file:///tmp/local.csv"))
diff --git a/providers/sftp/docs/index.rst b/providers/sftp/docs/index.rst
index 3b139178161..9c41b3f7cce 100644
--- a/providers/sftp/docs/index.rst
+++ b/providers/sftp/docs/index.rst
@@ -29,6 +29,13 @@
     Changelog <changelog>
     Security <security>
 
+.. toctree::
+    :hidden:
+    :maxdepth: 1
+    :caption: Guides
+
+    Filesystems <filesystems/index>
+
 .. toctree::
     :hidden:
     :maxdepth: 1
diff --git a/providers/sftp/provider.yaml b/providers/sftp/provider.yaml
index 47ecfc60443..ceaa67f7021 100644
--- a/providers/sftp/provider.yaml
+++ b/providers/sftp/provider.yaml
@@ -123,3 +123,6 @@ triggers:
   - integration-name: SSH File Transfer Protocol (SFTP)
     python-modules:
       - airflow.providers.sftp.triggers.sftp
+
+filesystems:
+  - airflow.providers.sftp.fs.sftp
diff --git a/providers/sftp/pyproject.toml b/providers/sftp/pyproject.toml
index 4d05ea7ed5c..d562e123d66 100644
--- a/providers/sftp/pyproject.toml
+++ b/providers/sftp/pyproject.toml
@@ -72,6 +72,9 @@ dependencies = [
 "openlineage" = [
     "apache-airflow-providers-openlineage"
 ]
+"sshfs" = [
+    "sshfs>=2023.1.0",
+]
 
 [dependency-groups]
 dev = [
diff --git a/providers/sftp/src/airflow/providers/sftp/fs/__init__.py 
b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/sftp/src/airflow/providers/sftp/fs/sftp.py 
b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py
new file mode 100644
index 00000000000..6a915db67d9
--- /dev/null
+++ b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from airflow.sdk.bases.hook import BaseHook
+
+if TYPE_CHECKING:
+    from fsspec import AbstractFileSystem
+
+schemes = ["sftp", "ssh"]
+
+
+def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) 
-> AbstractFileSystem:
+    try:
+        from sshfs import SSHFileSystem
+    except ImportError:
+        raise ImportError(
+            "Airflow FS SFTP/SSH protocol requires the sshfs library. "
+            "Install with: pip install apache-airflow-providers-sftp[sshfs]"
+        )
+
+    if conn_id is None:
+        return SSHFileSystem(**(storage_options or {}))
+
+    conn = BaseHook.get_connection(conn_id)
+    extras = conn.extra_dejson
+
+    options: dict[str, Any] = {
+        "host": conn.host,
+        "port": conn.port or 22,
+        "username": conn.login,
+    }
+
+    if conn.password:
+        options["password"] = conn.password
+
+    if key_file := extras.get("key_file"):
+        options["client_keys"] = [key_file]
+
+    if private_key := extras.get("private_key"):
+        options["client_keys"] = [private_key]
+        if passphrase := extras.get("private_key_passphrase"):
+            options["passphrase"] = passphrase
+
+    if str(extras.get("no_host_key_check", "")).lower() == "true":
+        options["known_hosts"] = None
+
+    options.update(storage_options or {})
+    return SSHFileSystem(**options)
diff --git a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py 
b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py
index 09b57468026..87ce10a7af5 100644
--- a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py
+++ b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py
@@ -71,4 +71,5 @@ def get_provider_info():
                 "python-modules": ["airflow.providers.sftp.triggers.sftp"],
             }
         ],
+        "filesystems": ["airflow.providers.sftp.fs.sftp"],
     }
diff --git a/providers/sftp/tests/unit/sftp/fs/__init__.py 
b/providers/sftp/tests/unit/sftp/fs/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/sftp/tests/unit/sftp/fs/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/sftp/tests/unit/sftp/fs/test_sftp.py 
b/providers/sftp/tests/unit/sftp/fs/test_sftp.py
new file mode 100644
index 00000000000..7f8da2bb3e4
--- /dev/null
+++ b/providers/sftp/tests/unit/sftp/fs/test_sftp.py
@@ -0,0 +1,222 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+pytest.importorskip("sshfs")
+
+TEST_CONN_ID = "sftp_test_conn"
+
+
[email protected](scope="module", autouse=True)
+def _setup_connections():
+    with pytest.MonkeyPatch.context() as mp_ctx:
+        mp_ctx.setenv(
+            f"AIRFLOW_CONN_{TEST_CONN_ID}".upper(),
+            "sftp://testuser:testpass@testhost:2222";,
+        )
+        yield
+
+
+class TestSftpFilesystem:
+    def test_schemes(self):
+        from airflow.providers.sftp.fs.sftp import schemes
+
+        assert "sftp" in schemes
+        assert "ssh" in schemes
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_with_connection(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        get_fs(conn_id=TEST_CONN_ID)
+
+        mock_sshfs.assert_called_once()
+        call_kwargs = mock_sshfs.call_args.kwargs
+        assert call_kwargs["host"] == "testhost"
+        assert call_kwargs["port"] == 2222
+        assert call_kwargs["username"] == "testuser"
+        assert call_kwargs["password"] == "testpass"
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_without_connection(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        # When conn_id is None, storage_options are passed directly to 
SSHFileSystem
+        storage_options = {"host": "manual-host", "username": "manual-user"}
+        get_fs(conn_id=None, storage_options=storage_options)
+
+        mock_sshfs.assert_called_once()
+        call_kwargs = mock_sshfs.call_args.kwargs
+        assert call_kwargs["host"] == "manual-host"
+        assert call_kwargs["username"] == "manual-user"
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_storage_options_merge(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        storage_options = {"custom_option": "custom_value"}
+        get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options)
+
+        mock_sshfs.assert_called_once()
+        call_kwargs = mock_sshfs.call_args.kwargs
+        assert call_kwargs["custom_option"] == "custom_value"
+        assert call_kwargs["host"] == "testhost"
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_storage_options_override(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        storage_options = {"port": 3333}
+        get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options)
+
+        mock_sshfs.assert_called_once()
+        call_kwargs = mock_sshfs.call_args.kwargs
+        assert call_kwargs["port"] == 3333
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_with_key_file(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_KEY_FILE",
+                "sftp://testuser@testhost?key_file=%2Fpath%2Fto%2Fkey";,
+            )
+
+            get_fs(conn_id="sftp_key_file")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert call_kwargs["client_keys"] == ["/path/to/key"]
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_with_private_key(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_PRIVATE_KEY",
+                
"sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT&private_key_passphrase=secret";,
+            )
+
+            get_fs(conn_id="sftp_private_key")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"]
+            assert call_kwargs["passphrase"] == "secret"
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_with_private_key_no_passphrase(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_PRIVATE_KEY_NO_PASS",
+                "sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT";,
+            )
+
+            get_fs(conn_id="sftp_private_key_no_pass")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"]
+            assert "passphrase" not in call_kwargs
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_with_no_host_key_check(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_NO_HOST_CHECK",
+                "sftp://testuser@testhost?no_host_key_check=true";,
+            )
+
+            get_fs(conn_id="sftp_no_host_check")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert call_kwargs["known_hosts"] is None
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_default_port(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_DEFAULT_PORT",
+                "sftp://testuser@testhost";,
+            )
+
+            get_fs(conn_id="sftp_default_port")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert call_kwargs["port"] == 22
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_without_password(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_NO_PASSWORD",
+                "sftp://testuser@testhost";,
+            )
+
+            get_fs(conn_id="sftp_no_password")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert "password" not in call_kwargs
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_host_key_check_enabled_by_default(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_HOST_CHECK_DEFAULT",
+                "sftp://testuser@testhost";,
+            )
+
+            get_fs(conn_id="sftp_host_check_default")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert "known_hosts" not in call_kwargs
+
+    @patch("sshfs.SSHFileSystem", autospec=True)
+    def test_get_fs_host_key_check_explicit_false(self, mock_sshfs):
+        from airflow.providers.sftp.fs.sftp import get_fs
+
+        with pytest.MonkeyPatch.context() as mp_ctx:
+            mp_ctx.setenv(
+                "AIRFLOW_CONN_SFTP_HOST_CHECK_FALSE",
+                "sftp://testuser@testhost?no_host_key_check=false";,
+            )
+
+            get_fs(conn_id="sftp_host_check_false")
+
+            mock_sshfs.assert_called_once()
+            call_kwargs = mock_sshfs.call_args.kwargs
+            assert "known_hosts" not in call_kwargs

Reply via email to