This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8e372e17eed Add SSH/SFTP support for ObjectStoragePath (#60757)
8e372e17eed is described below
commit 8e372e17eed89f3e96e107930a506c9f288f40f8
Author: anishgirianish <[email protected]>
AuthorDate: Sun Jan 25 08:26:46 2026 -0600
Add SSH/SFTP support for ObjectStoragePath (#60757)
* 60558 (feature) : added ssh sftp object store
* fix depreciation warning
* documentation clean ups
* clean ups
---
providers/sftp/docs/filesystems/index.rst | 26 +++
providers/sftp/docs/filesystems/sftp.rst | 63 ++++++
providers/sftp/docs/index.rst | 7 +
providers/sftp/provider.yaml | 3 +
providers/sftp/pyproject.toml | 3 +
.../sftp/src/airflow/providers/sftp/fs/__init__.py | 16 ++
.../sftp/src/airflow/providers/sftp/fs/sftp.py | 65 ++++++
.../airflow/providers/sftp/get_provider_info.py | 1 +
providers/sftp/tests/unit/sftp/fs/__init__.py | 16 ++
providers/sftp/tests/unit/sftp/fs/test_sftp.py | 222 +++++++++++++++++++++
10 files changed, 422 insertions(+)
diff --git a/providers/sftp/docs/filesystems/index.rst
b/providers/sftp/docs/filesystems/index.rst
new file mode 100644
index 00000000000..eb0036177a3
--- /dev/null
+++ b/providers/sftp/docs/filesystems/index.rst
@@ -0,0 +1,26 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Filesystems
+===========
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Filesystem Providers
+ :glob:
+
+ *
diff --git a/providers/sftp/docs/filesystems/sftp.rst
b/providers/sftp/docs/filesystems/sftp.rst
new file mode 100644
index 00000000000..5b1e402d6ce
--- /dev/null
+++ b/providers/sftp/docs/filesystems/sftp.rst
@@ -0,0 +1,63 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+SFTP Filesystem
+===============
+
+Use ``ObjectStoragePath`` with SFTP/SSH servers via the `sshfs
<https://github.com/fsspec/sshfs>`__ library.
+
+.. code-block:: bash
+
+ pip install apache-airflow-providers-sftp[sshfs]
+
+URL format: ``sftp://connection_id@hostname/path/to/file`` (also supports
``ssh://``).
+
+Configuration
+-------------
+
+Uses the standard SFTP connection. The following extras are supported:
+
+* ``key_file`` - path to private key file
+* ``private_key`` - private key content (PEM format)
+* ``private_key_passphrase`` - passphrase for encrypted keys
+* ``no_host_key_check`` - set to ``true`` to skip host key verification
+
+See :doc:`/connections/sftp` for details.
+
+Example
+-------
+
+.. code-block:: python
+
+ from airflow.sdk import ObjectStoragePath
+
+ path = ObjectStoragePath("sftp://my_conn@myserver/data/file.csv")
+
+ # read
+ with path.open() as f:
+ data = f.read()
+
+ # write
+ with path.open("w") as f:
+ f.write("content")
+
+ # list
+ for p in path.parent.iterdir():
+ print(p.name)
+
+ # copy
+ path.copy(ObjectStoragePath("file:///tmp/local.csv"))
diff --git a/providers/sftp/docs/index.rst b/providers/sftp/docs/index.rst
index 3b139178161..9c41b3f7cce 100644
--- a/providers/sftp/docs/index.rst
+++ b/providers/sftp/docs/index.rst
@@ -29,6 +29,13 @@
Changelog <changelog>
Security <security>
+.. toctree::
+ :hidden:
+ :maxdepth: 1
+ :caption: Guides
+
+ Filesystems <filesystems/index>
+
.. toctree::
:hidden:
:maxdepth: 1
diff --git a/providers/sftp/provider.yaml b/providers/sftp/provider.yaml
index 47ecfc60443..ceaa67f7021 100644
--- a/providers/sftp/provider.yaml
+++ b/providers/sftp/provider.yaml
@@ -123,3 +123,6 @@ triggers:
- integration-name: SSH File Transfer Protocol (SFTP)
python-modules:
- airflow.providers.sftp.triggers.sftp
+
+filesystems:
+ - airflow.providers.sftp.fs.sftp
diff --git a/providers/sftp/pyproject.toml b/providers/sftp/pyproject.toml
index 4d05ea7ed5c..d562e123d66 100644
--- a/providers/sftp/pyproject.toml
+++ b/providers/sftp/pyproject.toml
@@ -72,6 +72,9 @@ dependencies = [
"openlineage" = [
"apache-airflow-providers-openlineage"
]
+"sshfs" = [
+ "sshfs>=2023.1.0",
+]
[dependency-groups]
dev = [
diff --git a/providers/sftp/src/airflow/providers/sftp/fs/__init__.py
b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/sftp/src/airflow/providers/sftp/fs/sftp.py
b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py
new file mode 100644
index 00000000000..6a915db67d9
--- /dev/null
+++ b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from airflow.sdk.bases.hook import BaseHook
+
+if TYPE_CHECKING:
+ from fsspec import AbstractFileSystem
+
+schemes = ["sftp", "ssh"]
+
+
+def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None)
-> AbstractFileSystem:
+ try:
+ from sshfs import SSHFileSystem
+ except ImportError:
+ raise ImportError(
+ "Airflow FS SFTP/SSH protocol requires the sshfs library. "
+ "Install with: pip install apache-airflow-providers-sftp[sshfs]"
+ )
+
+ if conn_id is None:
+ return SSHFileSystem(**(storage_options or {}))
+
+ conn = BaseHook.get_connection(conn_id)
+ extras = conn.extra_dejson
+
+ options: dict[str, Any] = {
+ "host": conn.host,
+ "port": conn.port or 22,
+ "username": conn.login,
+ }
+
+ if conn.password:
+ options["password"] = conn.password
+
+ if key_file := extras.get("key_file"):
+ options["client_keys"] = [key_file]
+
+ if private_key := extras.get("private_key"):
+ options["client_keys"] = [private_key]
+ if passphrase := extras.get("private_key_passphrase"):
+ options["passphrase"] = passphrase
+
+ if str(extras.get("no_host_key_check", "")).lower() == "true":
+ options["known_hosts"] = None
+
+ options.update(storage_options or {})
+ return SSHFileSystem(**options)
diff --git a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py
b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py
index 09b57468026..87ce10a7af5 100644
--- a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py
+++ b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py
@@ -71,4 +71,5 @@ def get_provider_info():
"python-modules": ["airflow.providers.sftp.triggers.sftp"],
}
],
+ "filesystems": ["airflow.providers.sftp.fs.sftp"],
}
diff --git a/providers/sftp/tests/unit/sftp/fs/__init__.py
b/providers/sftp/tests/unit/sftp/fs/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/sftp/tests/unit/sftp/fs/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/sftp/tests/unit/sftp/fs/test_sftp.py
b/providers/sftp/tests/unit/sftp/fs/test_sftp.py
new file mode 100644
index 00000000000..7f8da2bb3e4
--- /dev/null
+++ b/providers/sftp/tests/unit/sftp/fs/test_sftp.py
@@ -0,0 +1,222 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+pytest.importorskip("sshfs")
+
+TEST_CONN_ID = "sftp_test_conn"
+
+
[email protected](scope="module", autouse=True)
+def _setup_connections():
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ f"AIRFLOW_CONN_{TEST_CONN_ID}".upper(),
+ "sftp://testuser:testpass@testhost:2222",
+ )
+ yield
+
+
+class TestSftpFilesystem:
+ def test_schemes(self):
+ from airflow.providers.sftp.fs.sftp import schemes
+
+ assert "sftp" in schemes
+ assert "ssh" in schemes
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_with_connection(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ get_fs(conn_id=TEST_CONN_ID)
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["host"] == "testhost"
+ assert call_kwargs["port"] == 2222
+ assert call_kwargs["username"] == "testuser"
+ assert call_kwargs["password"] == "testpass"
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_without_connection(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ # When conn_id is None, storage_options are passed directly to
SSHFileSystem
+ storage_options = {"host": "manual-host", "username": "manual-user"}
+ get_fs(conn_id=None, storage_options=storage_options)
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["host"] == "manual-host"
+ assert call_kwargs["username"] == "manual-user"
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_storage_options_merge(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ storage_options = {"custom_option": "custom_value"}
+ get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options)
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["custom_option"] == "custom_value"
+ assert call_kwargs["host"] == "testhost"
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_storage_options_override(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ storage_options = {"port": 3333}
+ get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options)
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["port"] == 3333
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_with_key_file(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_KEY_FILE",
+ "sftp://testuser@testhost?key_file=%2Fpath%2Fto%2Fkey",
+ )
+
+ get_fs(conn_id="sftp_key_file")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["client_keys"] == ["/path/to/key"]
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_with_private_key(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_PRIVATE_KEY",
+
"sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT&private_key_passphrase=secret",
+ )
+
+ get_fs(conn_id="sftp_private_key")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"]
+ assert call_kwargs["passphrase"] == "secret"
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_with_private_key_no_passphrase(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_PRIVATE_KEY_NO_PASS",
+ "sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT",
+ )
+
+ get_fs(conn_id="sftp_private_key_no_pass")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"]
+ assert "passphrase" not in call_kwargs
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_with_no_host_key_check(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_NO_HOST_CHECK",
+ "sftp://testuser@testhost?no_host_key_check=true",
+ )
+
+ get_fs(conn_id="sftp_no_host_check")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["known_hosts"] is None
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_default_port(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_DEFAULT_PORT",
+ "sftp://testuser@testhost",
+ )
+
+ get_fs(conn_id="sftp_default_port")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert call_kwargs["port"] == 22
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_without_password(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_NO_PASSWORD",
+ "sftp://testuser@testhost",
+ )
+
+ get_fs(conn_id="sftp_no_password")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert "password" not in call_kwargs
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_host_key_check_enabled_by_default(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_HOST_CHECK_DEFAULT",
+ "sftp://testuser@testhost",
+ )
+
+ get_fs(conn_id="sftp_host_check_default")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert "known_hosts" not in call_kwargs
+
+ @patch("sshfs.SSHFileSystem", autospec=True)
+ def test_get_fs_host_key_check_explicit_false(self, mock_sshfs):
+ from airflow.providers.sftp.fs.sftp import get_fs
+
+ with pytest.MonkeyPatch.context() as mp_ctx:
+ mp_ctx.setenv(
+ "AIRFLOW_CONN_SFTP_HOST_CHECK_FALSE",
+ "sftp://testuser@testhost?no_host_key_check=false",
+ )
+
+ get_fs(conn_id="sftp_host_check_false")
+
+ mock_sshfs.assert_called_once()
+ call_kwargs = mock_sshfs.call_args.kwargs
+ assert "known_hosts" not in call_kwargs