This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f0b0258e2 Allow FTPHook to change port number (#39465)
9f0b0258e2 is described below

commit 9f0b0258e254e7fb98c7d641c6c1d0617775ba42
Author: Velmira Georgieva <[email protected]>
AuthorDate: Mon May 27 14:04:47 2024 +0300

    Allow FTPHook to change port number (#39465)
    
    * Allow FTPHook to change port
    
    * Add integration tests and fix a bug
    
    * Styling fix
    
    * Add logging of the FTP host and port
    
    * Update logging and test styling
    
    * styling fix
    
    * formatting change
    
    * Update airflow/providers/ftp/hooks/ftp.py
    
    * fix log statement formatting
    
    ---------
    
    Co-authored-by: Elad Kalif <[email protected]>
---
 airflow/providers/ftp/hooks/ftp.py    | 13 ++++++++++-
 tests/providers/ftp/hooks/test_ftp.py | 42 +++++++++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/ftp/hooks/ftp.py 
b/airflow/providers/ftp/hooks/ftp.py
index 9f57efd9b6..ede52345a8 100644
--- a/airflow/providers/ftp/hooks/ftp.py
+++ b/airflow/providers/ftp/hooks/ftp.py
@@ -19,10 +19,13 @@ from __future__ import annotations
 
 import datetime
 import ftplib  # nosec: B402
+import logging
 from typing import Any, Callable
 
 from airflow.hooks.base import BaseHook
 
+logger = logging.getLogger(__name__)
+
 
 class FTPHook(BaseHook):
     """
@@ -58,7 +61,15 @@ class FTPHook(BaseHook):
         if self.conn is None:
             params = self.get_connection(self.ftp_conn_id)
             pasv = params.extra_dejson.get("passive", True)
-            self.conn = ftplib.FTP(params.host, params.login, params.password) 
 # nosec: B321
+            self.conn = ftplib.FTP()  # nosec: B321
+            if params.host:
+                port = ftplib.FTP_PORT
+                if params.port is not None:
+                    port = params.port
+                logger.info("Connecting via FTP to %s:%d", params.host, port)
+                self.conn.connect(params.host, port)
+                if params.login:
+                    self.conn.login(params.login, params.password)
             self.conn.set_pasv(pasv)
 
         return self.conn
diff --git a/tests/providers/ftp/hooks/test_ftp.py 
b/tests/providers/ftp/hooks/test_ftp.py
index 2f4239b243..f0f5c62379 100644
--- a/tests/providers/ftp/hooks/test_ftp.py
+++ b/tests/providers/ftp/hooks/test_ftp.py
@@ -155,6 +155,28 @@ class TestIntegrationFTPHook:
             Connection(conn_id="ftp_active", conn_type="ftp", 
host="localhost", extra='{"passive": false}')
         )
 
+        db.merge_conn(
+            Connection(
+                conn_id="ftp_custom_port",
+                conn_type="ftp",
+                host="localhost",
+                port=10000,
+                extra='{"passive": true}',
+            )
+        )
+
+        db.merge_conn(
+            Connection(
+                conn_id="ftp_custom_port_and_login",
+                conn_type="ftp",
+                host="localhost",
+                port=10000,
+                login="user",
+                password="pass123",
+                extra='{"passive": true}',
+            )
+        )
+
     def _test_mode(self, hook_type, connection_id, expected_mode):
         hook = hook_type(connection_id)
         conn = hook.get_conn()
@@ -172,6 +194,26 @@ class TestIntegrationFTPHook:
 
         self._test_mode(FTPHook, "ftp_active", False)
 
+    @mock.patch("ftplib.FTP")
+    def test_ftp_custom_port(self, mock_ftp):
+        from airflow.providers.ftp.hooks.ftp import FTPHook
+
+        hook = FTPHook("ftp_custom_port")
+        conn = hook.get_conn()
+        conn.connect.assert_called_once_with("localhost", 10000)
+        conn.login.assert_not_called()
+        conn.set_pasv.assert_called_once_with(True)
+
+    @mock.patch("ftplib.FTP")
+    def test_ftp_custom_port_and_login(self, mock_ftp):
+        from airflow.providers.ftp.hooks.ftp import FTPHook
+
+        hook = FTPHook("ftp_custom_port_and_login")
+        conn = hook.get_conn()
+        conn.connect.assert_called_once_with("localhost", 10000)
+        conn.login.assert_called_once_with("user", "pass123")
+        conn.set_pasv.assert_called_once_with(True)
+
     @mock.patch("ftplib.FTP_TLS")
     def test_ftps_passive_mode(self, mock_ftp):
         from airflow.providers.ftp.hooks.ftp import FTPSHook

Reply via email to