This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c76da  Improve type hinting to provider microsoft  (#9774)
d3c76da is described below

commit d3c76da95250068161580036a86e26ee2790fa07
Author: Guilherme Da Silva Gonçalves <[email protected]>
AuthorDate: Sun Jul 12 13:25:49 2020 -0300

    Improve type hinting to provider microsoft  (#9774)
---
 .../microsoft/azure/operators/adls_list.py         | 11 ++--
 airflow/providers/microsoft/azure/operators/adx.py |  4 +-
 .../microsoft/azure/operators/azure_batch.py       | 45 ++++++++-------
 .../azure/operators/azure_container_instances.py   | 65 +++++++++++-----------
 .../microsoft/azure/operators/azure_cosmos.py      | 13 +++--
 .../microsoft/azure/operators/wasb_delete_blob.py  | 16 ++++--
 .../microsoft/azure/sensors/azure_cosmos.py        | 14 +++--
 airflow/providers/microsoft/azure/sensors/wasb.py  | 24 ++++++--
 .../microsoft/azure/transfers/file_to_wasb.py      | 18 ++++--
 .../azure/transfers/oracle_to_azure_data_lake.py   | 33 ++++++-----
 10 files changed, 142 insertions(+), 101 deletions(-)

diff --git a/airflow/providers/microsoft/azure/operators/adls_list.py 
b/airflow/providers/microsoft/azure/operators/adls_list.py
index 8832bff..f184122 100644
--- a/airflow/providers/microsoft/azure/operators/adls_list.py
+++ b/airflow/providers/microsoft/azure/operators/adls_list.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Iterable
+from typing import Any, Dict, Iterable, List
 
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.azure.hooks.azure_data_lake import 
AzureDataLakeHook
@@ -52,15 +52,16 @@ class AzureDataLakeStorageListOperator(BaseOperator):
 
     @apply_defaults
     def __init__(self,
-                 path,
-                 azure_data_lake_conn_id='azure_data_lake_default',
+                 path: str,
+                 azure_data_lake_conn_id: str = 'azure_data_lake_default',
                  *args,
-                 **kwargs):
+                 **kwargs) -> None:
         super().__init__(*args, **kwargs)
         self.path = path
         self.azure_data_lake_conn_id = azure_data_lake_conn_id
 
-    def execute(self, context):
+    def execute(self,
+                context: Dict[Any, Any]) -> List:
 
         hook = AzureDataLakeHook(
             azure_data_lake_conn_id=self.azure_data_lake_conn_id
diff --git a/airflow/providers/microsoft/azure/operators/adx.py 
b/airflow/providers/microsoft/azure/operators/adx.py
index 0f64bb1..ebd1e95 100644
--- a/airflow/providers/microsoft/azure/operators/adx.py
+++ b/airflow/providers/microsoft/azure/operators/adx.py
@@ -18,7 +18,7 @@
 #
 
 """This module contains Azure Data Explorer operators"""
-from typing import Dict, Optional
+from typing import Any, Dict, Optional
 
 from azure.kusto.data._models import KustoResultTable
 
@@ -65,7 +65,7 @@ class AzureDataExplorerQueryOperator(BaseOperator):
         """Returns new instance of AzureDataExplorerHook"""
         return AzureDataExplorerHook(self.azure_data_explorer_conn_id)
 
-    def execute(self, context) -> KustoResultTable:
+    def execute(self, context: Dict[Any, Any]) -> KustoResultTable:
         """
         Run KQL Query on Azure Data Explorer (Kusto).
         Returns `PrimaryResult` of Query v2 HTTP response contents
diff --git a/airflow/providers/microsoft/azure/operators/azure_batch.py 
b/airflow/providers/microsoft/azure/operators/azure_batch.py
index b919be9..ff3f35a 100644
--- a/airflow/providers/microsoft/azure/operators/azure_batch.py
+++ b/airflow/providers/microsoft/azure/operators/azure_batch.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-from typing import List, Optional
+from typing import Any, Dict, List, Optional
 
 from azure.batch import models as batch_models
 
@@ -79,8 +79,8 @@ class AzureBatchOperator(BaseOperator):
     :type batch_start_task: Optional[batch_models.StartTask]
 
     :param batch_max_retries: The number of times to retry this batch 
operation before it's
-        considered a failed operation
-    :type batch_max_retries: Optional[int]
+        considered a failed operation. Default is 3
+    :type batch_max_retries: int
 
     :param batch_task_resource_files: A list of files that the Batch service 
will
         download to the Compute Node before running the command line.
@@ -102,8 +102,8 @@ class AzureBatchOperator(BaseOperator):
         This property must not be specified if enable_auto_scale is set to 
true.
     :type target_dedicated_nodes: Optional[int]
 
-    :param enable_auto_scale: Whether the Pool size should automatically 
adjust over time
-    :type enable_auto_scale: Optional[bool]
+    :param enable_auto_scale: Whether the Pool size should automatically 
adjust over time. Default is false
+    :type enable_auto_scale: bool
 
     :param auto_scale_formula: A formula for the desired number of Compute 
Nodes in the Pool.
         This property must not be specified if enableAutoScale is set to false.
@@ -114,8 +114,8 @@ class AzureBatchOperator(BaseOperator):
     :type azure_batch_conn_id: str
 
     :param use_latest_verified_vm_image_and_sku: Whether to use the latest 
verified virtual
-        machine image and sku in the batch account
-    :type use_latest_verified_vm_image_and_sku: Optional[bool]
+        machine image and sku in the batch account. Default is false.
+    :type use_latest_verified_vm_image_and_sku: bool
 
     :param vm_publisher: The publisher of the Azure Virtual Machines 
Marketplace Image.
         For example, Canonical or MicrosoftWindowsServer. Required if
@@ -131,14 +131,14 @@ class AzureBatchOperator(BaseOperator):
         use_latest_image_and_sku is set to True
     :type sku_starts_with: Optional[str]
 
-    :param timeout: The amount of time to wait for the job to complete in 
minutes
-    :type timeout: Optional[int]
+    :param timeout: The amount of time to wait for the job to complete in 
minutes. Default is 25
+    :type timeout: int
 
     :param should_delete_job: Whether to delete job after execution. Default 
is False
-    :type should_delete_job: Optional[bool]
+    :type should_delete_job: bool
 
     :param should_delete_pool: Whether to delete pool after execution of jobs. 
Default is False
-    :type should_delete_pool: Optional[bool]
+    :type should_delete_pool: bool
 
 
     """
@@ -162,22 +162,22 @@ class AzureBatchOperator(BaseOperator):
                  batch_task_display_name: Optional[str] = None,
                  batch_task_container_settings: 
Optional[batch_models.TaskContainerSettings] = None,
                  batch_start_task: Optional[batch_models.StartTask] = None,
-                 batch_max_retries: Optional[int] = 3,
+                 batch_max_retries: int = 3,
                  batch_task_resource_files: 
Optional[List[batch_models.ResourceFile]] = None,
                  batch_task_output_files: 
Optional[List[batch_models.OutputFile]] = None,
                  batch_task_user_identity: Optional[batch_models.UserIdentity] 
= None,
                  target_low_priority_nodes: Optional[int] = None,
                  target_dedicated_nodes: Optional[int] = None,
-                 enable_auto_scale: Optional[bool] = False,
+                 enable_auto_scale: bool = False,
                  auto_scale_formula: Optional[str] = None,
                  azure_batch_conn_id='azure_batch_default',
-                 use_latest_verified_vm_image_and_sku: Optional[bool] = False,
+                 use_latest_verified_vm_image_and_sku: bool = False,
                  vm_publisher: Optional[str] = None,
                  vm_offer: Optional[str] = None,
                  sku_starts_with: Optional[str] = None,
-                 timeout: Optional[int] = 25,
-                 should_delete_job: Optional[bool] = False,
-                 should_delete_pool: Optional[bool] = False,
+                 timeout: int = 25,
+                 should_delete_job: bool = False,
+                 should_delete_pool: bool = False,
                  *args,
                  **kwargs) -> None:
 
@@ -213,7 +213,7 @@ class AzureBatchOperator(BaseOperator):
         self.should_delete_pool = should_delete_pool
         self.hook = self.get_hook()
 
-    def _check_inputs(self):
+    def _check_inputs(self) -> Any:
 
         if self.use_latest_image:
             if not all(elem for elem in [self.vm_publisher, self.vm_offer, 
self.sku_starts_with]):
@@ -240,7 +240,8 @@ class AzureBatchOperator(BaseOperator):
             raise AirflowException("Some required parameters are 
missing.Please you must set "
                                    "all the required parameters. ")
 
-    def execute(self, context):
+    def execute(self,
+                context: Dict[Any, Any]) -> None:
         self._check_inputs()
         self.hook.connection.config.retry_policy = self.batch_max_retries
 
@@ -305,7 +306,7 @@ class AzureBatchOperator(BaseOperator):
         )
         self.log.info("Azure Batch job (%s) terminated: %s", 
self.batch_job_id, response)
 
-    def get_hook(self):
+    def get_hook(self) -> AzureBatchHook:
         """
         Create and return an AzureBatchHook.
 
@@ -314,7 +315,9 @@ class AzureBatchOperator(BaseOperator):
             azure_batch_conn_id=self.azure_batch_conn_id
         )
 
-    def clean_up(self, pool_id=None, job_id=None):
+    def clean_up(self,
+                 pool_id: Optional[str] = None,
+                 job_id: Optional[str] = None) -> None:
         """
         Delete the given pool and job in the batch account
 
diff --git 
a/airflow/providers/microsoft/azure/operators/azure_container_instances.py 
b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
index 056c4ba..06f2a34 100644
--- a/airflow/providers/microsoft/azure/operators/azure_container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
@@ -19,7 +19,7 @@
 import re
 from collections import namedtuple
 from time import sleep
-from typing import Dict, Sequence
+from typing import Any, Dict, List, Optional, Sequence, Union
 
 from azure.mgmt.containerinstance.models import (
     Container, ContainerGroup, EnvironmentVariable, ResourceRequests, 
ResourceRequirements, VolumeMount,
@@ -69,10 +69,10 @@ class AzureContainerInstancesOperator(BaseOperator):
     :type region: str
     :param environment_variables: key,value pairs containing environment
         variables which will be passed to the running container
-    :type environment_variables: dict
+    :type environment_variables: Optional[dict]
     :param secured_variables: names of environmental variables that should not
         be exposed outside the container (typically passwords).
-    :type secured_variables: [str]
+    :type secured_variables: Optional[str]
     :param volumes: list of ``Volume`` tuples to be mounted to the container.
         Currently only Azure Fileshares are supported.
     :type volumes: list[<conn_id, account_name, share_name, mount_path, 
read_only>]
@@ -83,12 +83,12 @@ class AzureContainerInstancesOperator(BaseOperator):
     :param gpu: GPU Resource for the container.
     :type gpu: azure.mgmt.containerinstance.models.GpuResource
     :param command: the command to run inside the container
-    :type command: [str]
+    :type command: Optional[str]
     :param container_timeout: max time allowed for the execution of
         the container instance.
     :type container_timeout: datetime.timedelta
     :param tags: azure tags as dict of str:str
-    :type tags: dict[str, str]
+    :type tags: Optional[dict[str, str]]
 
     **Example**::
 
@@ -123,24 +123,24 @@ class AzureContainerInstancesOperator(BaseOperator):
     # pylint: disable=too-many-arguments
     @apply_defaults
     def __init__(self,
-                 ci_conn_id,
-                 registry_conn_id,
-                 resource_group,
-                 name,
-                 image,
-                 region,
-                 environment_variables=None,
-                 secured_variables=None,
-                 volumes=None,
-                 memory_in_gb=None,
-                 cpu=None,
-                 gpu=None,
-                 command=None,
-                 remove_on_error=True,
-                 fail_if_exists=True,
-                 tags=None,
+                 ci_conn_id: str,
+                 registry_conn_id: Optional[str],
+                 resource_group: str,
+                 name: str,
+                 image: str,
+                 region: str,
+                 environment_variables: Optional[Dict[Any, Any]] = None,
+                 secured_variables: Optional[str] = None,
+                 volumes: Optional[List[Any]] = None,
+                 memory_in_gb: Optional[Any] = None,
+                 cpu: Optional[Any] = None,
+                 gpu: Optional[Any] = None,
+                 command: Optional[str] = None,
+                 remove_on_error: bool = True,
+                 fail_if_exists: bool = True,
+                 tags: Optional[Dict[str, str]] = None,
                  *args,
-                 **kwargs):
+                 **kwargs) -> None:
         super().__init__(*args, **kwargs)
 
         self.ci_conn_id = ci_conn_id
@@ -158,10 +158,11 @@ class AzureContainerInstancesOperator(BaseOperator):
         self.command = command
         self.remove_on_error = remove_on_error
         self.fail_if_exists = fail_if_exists
-        self._ci_hook = None
+        self._ci_hook: Any = None
         self.tags = tags
 
-    def execute(self, context):
+    def execute(self,
+                context: Dict[Any, Any]) -> int:
         # Check name again in case it was templated.
         self._check_name(self.name)
 
@@ -174,7 +175,7 @@ class AzureContainerInstancesOperator(BaseOperator):
 
         if self.registry_conn_id:
             registry_hook = AzureContainerRegistryHook(self.registry_conn_id)
-            image_registry_credentials = [registry_hook.connection, ]
+            image_registry_credentials: Optional[List[Any]] = 
[registry_hook.connection, ]
         else:
             image_registry_credentials = None
 
@@ -186,8 +187,8 @@ class AzureContainerInstancesOperator(BaseOperator):
                 e = EnvironmentVariable(name=key, value=value)
             environment_variables.append(e)
 
-        volumes = []
-        volume_mounts = []
+        volumes: List[Union[Volume, Volume]] = []
+        volume_mounts: List[Union[VolumeMount, VolumeMount]] = []
         for conn_id, account_name, share_name, mount_path, read_only in 
self.volumes:
             hook = AzureContainerVolumeHook(conn_id)
 
@@ -250,7 +251,7 @@ class AzureContainerInstancesOperator(BaseOperator):
             if exit_code == 0 or self.remove_on_error:
                 self.on_kill()
 
-    def on_kill(self):
+    def on_kill(self) -> None:
         if self.remove_on_error:
             self.log.info("Deleting container group")
             try:
@@ -258,7 +259,7 @@ class AzureContainerInstancesOperator(BaseOperator):
             except Exception:  # pylint: disable=broad-except
                 self.log.exception("Could not delete container group")
 
-    def _monitor_logging(self, resource_group, name):
+    def _monitor_logging(self, resource_group: str, name: str) -> int:
         last_state = None
         last_message_logged = None
         last_line_logged = None
@@ -318,7 +319,9 @@ class AzureContainerInstancesOperator(BaseOperator):
 
         sleep(1)
 
-    def _log_last(self, logs, last_line_logged):
+    def _log_last(self,
+                  logs: Optional[List[Any]],
+                  last_line_logged: Any) -> Optional[Any]:
         if logs:
             # determine the last line which was logged before
             last_line_index = 0
@@ -336,7 +339,7 @@ class AzureContainerInstancesOperator(BaseOperator):
         return None
 
     @staticmethod
-    def _check_name(name):
+    def _check_name(name: str) -> str:
         if '{{' in name:
             # Let macros pass as they cannot be checked at construction time
             return name
diff --git a/airflow/providers/microsoft/azure/operators/azure_cosmos.py 
b/airflow/providers/microsoft/azure/operators/azure_cosmos.py
index c46aae0..10bfc24 100644
--- a/airflow/providers/microsoft/azure/operators/azure_cosmos.py
+++ b/airflow/providers/microsoft/azure/operators/azure_cosmos.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from typing import Any, Dict
 
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.azure.hooks.azure_cosmos import 
AzureCosmosDBHook
@@ -40,19 +41,19 @@ class AzureCosmosInsertDocumentOperator(BaseOperator):
 
     @apply_defaults
     def __init__(self,
-                 database_name,
-                 collection_name,
-                 document,
-                 azure_cosmos_conn_id='azure_cosmos_default',
+                 database_name: str,
+                 collection_name: str,
+                 document: dict,
+                 azure_cosmos_conn_id: str = 'azure_cosmos_default',
                  *args,
-                 **kwargs):
+                 **kwargs) -> None:
         super().__init__(*args, **kwargs)
         self.database_name = database_name
         self.collection_name = collection_name
         self.document = document
         self.azure_cosmos_conn_id = azure_cosmos_conn_id
 
-    def execute(self, context):
+    def execute(self, context: Dict[Any, Any]) -> None:
         # Create the hook
         hook = 
AzureCosmosDBHook(azure_cosmos_conn_id=self.azure_cosmos_conn_id)
 
diff --git a/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py 
b/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
index 79bc141..b669e63 100644
--- a/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
+++ b/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
@@ -16,6 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from typing import Any, Dict
+
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 from airflow.utils.decorators import apply_defaults
@@ -43,11 +45,15 @@ class WasbDeleteBlobOperator(BaseOperator):
     template_fields = ('container_name', 'blob_name')
 
     @apply_defaults
-    def __init__(self, container_name, blob_name,
-                 wasb_conn_id='wasb_default', check_options=None,
-                 is_prefix=False, ignore_if_missing=False,
+    def __init__(self,
+                 container_name: str,
+                 blob_name: str,
+                 wasb_conn_id: str = 'wasb_default',
+                 check_options: Any = None,
+                 is_prefix: bool = False,
+                 ignore_if_missing: bool = False,
                  *args,
-                 **kwargs):
+                 **kwargs) -> None:
         super().__init__(*args, **kwargs)
         if check_options is None:
             check_options = {}
@@ -58,7 +64,7 @@ class WasbDeleteBlobOperator(BaseOperator):
         self.is_prefix = is_prefix
         self.ignore_if_missing = ignore_if_missing
 
-    def execute(self, context):
+    def execute(self, context: Dict[Any, Any]) -> None:
         self.log.info(
             'Deleting blob: %s\nin wasb://%s', self.blob_name, 
self.container_name
         )
diff --git a/airflow/providers/microsoft/azure/sensors/azure_cosmos.py 
b/airflow/providers/microsoft/azure/sensors/azure_cosmos.py
index 87ed643..d7f2616 100644
--- a/airflow/providers/microsoft/azure/sensors/azure_cosmos.py
+++ b/airflow/providers/microsoft/azure/sensors/azure_cosmos.py
@@ -15,6 +15,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from typing import Any, Dict
+
 from airflow.providers.microsoft.azure.hooks.azure_cosmos import 
AzureCosmosDBHook
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
@@ -45,19 +47,19 @@ class AzureCosmosDocumentSensor(BaseSensorOperator):
     @apply_defaults
     def __init__(
             self,
-            database_name,
-            collection_name,
-            document_id,
-            azure_cosmos_conn_id="azure_cosmos_default",
+            database_name: str,
+            collection_name: str,
+            document_id: str,
+            azure_cosmos_conn_id: str = "azure_cosmos_default",
             *args,
-            **kwargs):
+            **kwargs) -> None:
         super().__init__(*args, **kwargs)
         self.azure_cosmos_conn_id = azure_cosmos_conn_id
         self.database_name = database_name
         self.collection_name = collection_name
         self.document_id = document_id
 
-    def poke(self, context):
+    def poke(self, context: Dict[Any, Any]) -> bool:
         self.log.info("*** Intering poke")
         hook = AzureCosmosDBHook(self.azure_cosmos_conn_id)
         return hook.get_document(self.document_id, self.database_name, 
self.collection_name) is not None
diff --git a/airflow/providers/microsoft/azure/sensors/wasb.py 
b/airflow/providers/microsoft/azure/sensors/wasb.py
index 9708ee1..f689ed0 100644
--- a/airflow/providers/microsoft/azure/sensors/wasb.py
+++ b/airflow/providers/microsoft/azure/sensors/wasb.py
@@ -16,6 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from typing import Any, Dict, Optional
+
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
@@ -39,8 +41,12 @@ class WasbBlobSensor(BaseSensorOperator):
     template_fields = ('container_name', 'blob_name')
 
     @apply_defaults
-    def __init__(self, container_name, blob_name,
-                 wasb_conn_id='wasb_default', check_options=None, *args,
+    def __init__(self,
+                 container_name: str,
+                 blob_name: str,
+                 wasb_conn_id: str = 'wasb_default',
+                 check_options: Optional[dict] = None,
+                 *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         if check_options is None:
@@ -50,7 +56,7 @@ class WasbBlobSensor(BaseSensorOperator):
         self.blob_name = blob_name
         self.check_options = check_options
 
-    def poke(self, context):
+    def poke(self, context: Dict[Any, Any]):
         self.log.info(
             'Poking for blob: %s\nin wasb://%s', self.blob_name, 
self.container_name
         )
@@ -77,8 +83,13 @@ class WasbPrefixSensor(BaseSensorOperator):
     template_fields = ('container_name', 'prefix')
 
     @apply_defaults
-    def __init__(self, container_name, prefix, wasb_conn_id='wasb_default',
-                 check_options=None, *args, **kwargs):
+    def __init__(self,
+                 container_name: str,
+                 prefix: str,
+                 wasb_conn_id: str = 'wasb_default',
+                 check_options: Optional[dict] = None,
+                 *args,
+                 **kwargs) -> None:
         super().__init__(*args, **kwargs)
         if check_options is None:
             check_options = {}
@@ -87,7 +98,8 @@ class WasbPrefixSensor(BaseSensorOperator):
         self.prefix = prefix
         self.check_options = check_options
 
-    def poke(self, context):
+    def poke(self,
+             context: Dict[Any, Any]) -> bool:
         self.log.info('Poking for prefix: %s in wasb://%s', self.prefix, 
self.container_name)
         hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
         return hook.check_for_prefix(self.container_name, self.prefix,
diff --git a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py 
b/airflow/providers/microsoft/azure/transfers/file_to_wasb.py
index b43017f..64cdbc0 100644
--- a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py
+++ b/airflow/providers/microsoft/azure/transfers/file_to_wasb.py
@@ -16,6 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from typing import Any, Dict, Optional
+
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 from airflow.utils.decorators import apply_defaults
@@ -35,14 +37,19 @@ class FileToWasbOperator(BaseOperator):
     :type wasb_conn_id: str
     :param load_options: Optional keyword arguments that
         `WasbHook.load_file()` takes.
-    :type load_options: dict
+    :type load_options: Optional[dict]
     """
     template_fields = ('file_path', 'container_name', 'blob_name')
 
     @apply_defaults
-    def __init__(self, file_path, container_name, blob_name,
-                 wasb_conn_id='wasb_default', load_options=None, *args,
-                 **kwargs):
+    def __init__(self,
+                 file_path: str,
+                 container_name: str,
+                 blob_name: str,
+                 wasb_conn_id: str = 'wasb_default',
+                 load_options: Optional[dict] = None,
+                 *args,
+                 **kwargs) -> None:
         super().__init__(*args, **kwargs)
         if load_options is None:
             load_options = {}
@@ -52,7 +59,8 @@ class FileToWasbOperator(BaseOperator):
         self.wasb_conn_id = wasb_conn_id
         self.load_options = load_options
 
-    def execute(self, context):
+    def execute(self,
+                context: Dict[Any, Any]) -> None:
         """Upload a file to Azure Blob Storage."""
         hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
         self.log.info(
diff --git 
a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py 
b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
index 11013d5..65c128c 100644
--- a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
+++ b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
@@ -18,6 +18,7 @@
 
 import os
 from tempfile import TemporaryDirectory
+from typing import Any, Dict, Optional, Union
 
 import unicodecsv as csv
 
@@ -44,7 +45,7 @@ class OracleToAzureDataLakeOperator(BaseOperator):
     :param sql: SQL query to execute against the Oracle database. (templated)
     :type sql: str
     :param sql_params: Parameters to use in sql query. (templated)
-    :type sql_params: str
+    :type sql_params: Optional[dict]
     :param delimiter: field delimiter in the file.
     :type delimiter: str
     :param encoding: encoding type for the file.
@@ -62,17 +63,18 @@ class OracleToAzureDataLakeOperator(BaseOperator):
     @apply_defaults
     def __init__(
             self,
-            filename,
-            azure_data_lake_conn_id,
-            azure_data_lake_path,
-            oracle_conn_id,
-            sql,
-            sql_params=None,
-            delimiter=",",
-            encoding="utf-8",
-            quotechar='"',
-            quoting=csv.QUOTE_MINIMAL,
-            *args, **kwargs):
+            filename: str,
+            azure_data_lake_conn_id: str,
+            azure_data_lake_path: str,
+            oracle_conn_id: str,
+            sql: str,
+            sql_params: Optional[dict] = None,
+            delimiter: str = ",",
+            encoding: str = "utf-8",
+            quotechar: str = '"',
+            quoting: str = csv.QUOTE_MINIMAL,
+            *args,
+            **kwargs) -> None:
         super().__init__(*args, **kwargs)
         if sql_params is None:
             sql_params = {}
@@ -87,7 +89,9 @@ class OracleToAzureDataLakeOperator(BaseOperator):
         self.quotechar = quotechar
         self.quoting = quoting
 
-    def _write_temp_file(self, cursor, path_to_save):
+    def _write_temp_file(self,
+                         cursor: Any,
+                         path_to_save: Union[str, bytes, int]) -> None:
         with open(path_to_save, 'wb') as csvfile:
             csv_writer = csv.writer(csvfile, delimiter=self.delimiter,
                                     encoding=self.encoding, 
quotechar=self.quotechar,
@@ -96,7 +100,8 @@ class OracleToAzureDataLakeOperator(BaseOperator):
             csv_writer.writerows(cursor)
             csvfile.flush()
 
-    def execute(self, context):
+    def execute(self,
+                context: Dict[Any, Any]) -> None:
         oracle_hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
         azure_data_lake_hook = AzureDataLakeHook(
             azure_data_lake_conn_id=self.azure_data_lake_conn_id)

Reply via email to