This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 027cc16 Improve type annotations for Ftp provider (#9868)
027cc16 is described below
commit 027cc1682c3b068dfeee143ca538b5e8dadfcd17
Author: Alexander Sutcliffe <[email protected]>
AuthorDate: Fri Jul 17 16:41:31 2020 +0200
Improve type annotations for Ftp provider (#9868)
---
airflow/providers/ftp/hooks/ftp.py | 27 ++++++++++++++-------------
airflow/providers/ftp/sensors/ftp.py | 14 +++++++-------
2 files changed, 21 insertions(+), 20 deletions(-)
diff --git a/airflow/providers/ftp/hooks/ftp.py
b/airflow/providers/ftp/hooks/ftp.py
index f0e9355..f569e59 100644
--- a/airflow/providers/ftp/hooks/ftp.py
+++ b/airflow/providers/ftp/hooks/ftp.py
@@ -20,11 +20,12 @@
import datetime
import ftplib
import os.path
+from typing import Generator, List, Optional, Union
from airflow.hooks.base_hook import BaseHook
-def mlsd(conn, path="", facts=None):
+def mlsd(conn, path: str = "", facts: Optional[Union[str, List[str]]] = None)
-> Generator:
"""
BACKPORT FROM PYTHON3 FTPLIB.
@@ -46,7 +47,7 @@ def mlsd(conn, path="", facts=None):
cmd = "MLSD %s" % path
else:
cmd = "MLSD"
- lines = []
+ lines: List = []
conn.retrlines(cmd, lines.append)
for line in lines:
facts_found, _, name = line.rstrip(ftplib.CRLF).partition(' ')
@@ -66,10 +67,10 @@ class FTPHook(BaseHook):
connection as ``{"passive": "true"}``.
"""
- def __init__(self, ftp_conn_id='ftp_default'):
+ def __init__(self, ftp_conn_id: str = 'ftp_default') -> None:
super().__init__()
self.ftp_conn_id = ftp_conn_id
- self.conn = None
+ self.conn: Optional[ftplib.FTP] = None
def __enter__(self):
return self
@@ -78,7 +79,7 @@ class FTPHook(BaseHook):
if self.conn is not None:
self.close_conn()
- def get_conn(self):
+ def get_conn(self) -> ftplib.FTP:
"""
Returns a FTP connection object
"""
@@ -99,7 +100,7 @@ class FTPHook(BaseHook):
conn.quit()
self.conn = None
- def describe_directory(self, path):
+ def describe_directory(self, path: str) -> dict:
"""
Returns a dictionary of {filename: {attributes}} for all files
on the remote system (where the MLSD command is supported).
@@ -116,7 +117,7 @@ class FTPHook(BaseHook):
files = dict(mlsd(conn))
return files
- def list_directory(self, path):
+ def list_directory(self, path: str) -> List[str]:
"""
Returns a list of files on the remote system.
@@ -129,7 +130,7 @@ class FTPHook(BaseHook):
files = conn.nlst()
return files
- def create_directory(self, path):
+ def create_directory(self, path: str) -> None:
"""
Creates a directory on the remote system.
@@ -139,7 +140,7 @@ class FTPHook(BaseHook):
conn = self.get_conn()
conn.mkd(path)
- def delete_directory(self, path):
+ def delete_directory(self, path: str) -> None:
"""
Deletes a directory on the remote system.
@@ -252,7 +253,7 @@ class FTPHook(BaseHook):
if is_path:
input_handle.close()
- def delete_file(self, path):
+ def delete_file(self, path: str) -> None:
"""
Removes a file on the FTP Server.
@@ -262,7 +263,7 @@ class FTPHook(BaseHook):
conn = self.get_conn()
conn.delete(path)
- def rename(self, from_name, to_name):
+ def rename(self, from_name: str, to_name: str) -> str:
"""
Rename a file.
@@ -272,7 +273,7 @@ class FTPHook(BaseHook):
conn = self.get_conn()
return conn.rename(from_name, to_name)
- def get_mod_time(self, path):
+ def get_mod_time(self, path: str) -> datetime.datetime:
"""
Returns a datetime object representing the last time the file was
modified
@@ -303,7 +304,7 @@ class FTPSHook(FTPHook):
"""
Interact with FTPS.
"""
- def get_conn(self):
+ def get_conn(self) -> ftplib.FTP:
"""
Returns a FTPS connection object.
"""
diff --git a/airflow/providers/ftp/sensors/ftp.py
b/airflow/providers/ftp/sensors/ftp.py
index fdaaf1a..a36e558 100644
--- a/airflow/providers/ftp/sensors/ftp.py
+++ b/airflow/providers/ftp/sensors/ftp.py
@@ -46,18 +46,18 @@ class FTPSensor(BaseSensorOperator):
@apply_defaults
def __init__(
self,
- path,
- ftp_conn_id='ftp_default',
- fail_on_transient_errors=True,
+ path: str,
+ ftp_conn_id: str = 'ftp_default',
+ fail_on_transient_errors: bool = True,
*args,
- **kwargs):
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
self.path = path
self.ftp_conn_id = ftp_conn_id
self.fail_on_transient_errors = fail_on_transient_errors
- def _create_hook(self):
+ def _create_hook(self) -> FTPHook:
"""Return connection hook."""
return FTPHook(ftp_conn_id=self.ftp_conn_id)
@@ -70,7 +70,7 @@ class FTPSensor(BaseSensorOperator):
except ValueError:
return e
- def poke(self, context):
+ def poke(self, context: dict) -> bool:
with self._create_hook() as hook:
self.log.info('Poking for %s', self.path)
try:
@@ -90,6 +90,6 @@ class FTPSensor(BaseSensorOperator):
class FTPSSensor(FTPSensor):
"""Waits for a file or directory to be present on FTP over SSL."""
- def _create_hook(self):
+ def _create_hook(self) -> FTPHook:
"""Return connection hook."""
return FTPSHook(ftp_conn_id=self.ftp_conn_id)