This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new d3c76da Improve type hinting to provider microsoft (#9774)
d3c76da is described below
commit d3c76da95250068161580036a86e26ee2790fa07
Author: Guilherme Da Silva Gonçalves <[email protected]>
AuthorDate: Sun Jul 12 13:25:49 2020 -0300
Improve type hinting to provider microsoft (#9774)
---
.../microsoft/azure/operators/adls_list.py | 11 ++--
airflow/providers/microsoft/azure/operators/adx.py | 4 +-
.../microsoft/azure/operators/azure_batch.py | 45 ++++++++-------
.../azure/operators/azure_container_instances.py | 65 +++++++++++-----------
.../microsoft/azure/operators/azure_cosmos.py | 13 +++--
.../microsoft/azure/operators/wasb_delete_blob.py | 16 ++++--
.../microsoft/azure/sensors/azure_cosmos.py | 14 +++--
airflow/providers/microsoft/azure/sensors/wasb.py | 24 ++++++--
.../microsoft/azure/transfers/file_to_wasb.py | 18 ++++--
.../azure/transfers/oracle_to_azure_data_lake.py | 33 ++++++-----
10 files changed, 142 insertions(+), 101 deletions(-)
diff --git a/airflow/providers/microsoft/azure/operators/adls_list.py
b/airflow/providers/microsoft/azure/operators/adls_list.py
index 8832bff..f184122 100644
--- a/airflow/providers/microsoft/azure/operators/adls_list.py
+++ b/airflow/providers/microsoft/azure/operators/adls_list.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Iterable
+from typing import Any, Dict, Iterable, List
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.azure_data_lake import
AzureDataLakeHook
@@ -52,15 +52,16 @@ class AzureDataLakeStorageListOperator(BaseOperator):
@apply_defaults
def __init__(self,
- path,
- azure_data_lake_conn_id='azure_data_lake_default',
+ path: str,
+ azure_data_lake_conn_id: str = 'azure_data_lake_default',
*args,
- **kwargs):
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id
- def execute(self, context):
+ def execute(self,
+ context: Dict[Any, Any]) -> List:
hook = AzureDataLakeHook(
azure_data_lake_conn_id=self.azure_data_lake_conn_id
diff --git a/airflow/providers/microsoft/azure/operators/adx.py
b/airflow/providers/microsoft/azure/operators/adx.py
index 0f64bb1..ebd1e95 100644
--- a/airflow/providers/microsoft/azure/operators/adx.py
+++ b/airflow/providers/microsoft/azure/operators/adx.py
@@ -18,7 +18,7 @@
#
"""This module contains Azure Data Explorer operators"""
-from typing import Dict, Optional
+from typing import Any, Dict, Optional
from azure.kusto.data._models import KustoResultTable
@@ -65,7 +65,7 @@ class AzureDataExplorerQueryOperator(BaseOperator):
"""Returns new instance of AzureDataExplorerHook"""
return AzureDataExplorerHook(self.azure_data_explorer_conn_id)
- def execute(self, context) -> KustoResultTable:
+ def execute(self, context: Dict[Any, Any]) -> KustoResultTable:
"""
Run KQL Query on Azure Data Explorer (Kusto).
Returns `PrimaryResult` of Query v2 HTTP response contents
diff --git a/airflow/providers/microsoft/azure/operators/azure_batch.py
b/airflow/providers/microsoft/azure/operators/azure_batch.py
index b919be9..ff3f35a 100644
--- a/airflow/providers/microsoft/azure/operators/azure_batch.py
+++ b/airflow/providers/microsoft/azure/operators/azure_batch.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-from typing import List, Optional
+from typing import Any, Dict, List, Optional
from azure.batch import models as batch_models
@@ -79,8 +79,8 @@ class AzureBatchOperator(BaseOperator):
:type batch_start_task: Optional[batch_models.StartTask]
:param batch_max_retries: The number of times to retry this batch
operation before it's
- considered a failed operation
- :type batch_max_retries: Optional[int]
+ considered a failed operation. Default is 3
+ :type batch_max_retries: int
:param batch_task_resource_files: A list of files that the Batch service
will
download to the Compute Node before running the command line.
@@ -102,8 +102,8 @@ class AzureBatchOperator(BaseOperator):
This property must not be specified if enable_auto_scale is set to
true.
:type target_dedicated_nodes: Optional[int]
- :param enable_auto_scale: Whether the Pool size should automatically
adjust over time
- :type enable_auto_scale: Optional[bool]
+ :param enable_auto_scale: Whether the Pool size should automatically
adjust over time. Default is false
+ :type enable_auto_scale: bool
:param auto_scale_formula: A formula for the desired number of Compute
Nodes in the Pool.
This property must not be specified if enableAutoScale is set to false.
@@ -114,8 +114,8 @@ class AzureBatchOperator(BaseOperator):
:type azure_batch_conn_id: str
:param use_latest_verified_vm_image_and_sku: Whether to use the latest
verified virtual
- machine image and sku in the batch account
- :type use_latest_verified_vm_image_and_sku: Optional[bool]
+ machine image and sku in the batch account. Default is false.
+ :type use_latest_verified_vm_image_and_sku: bool
:param vm_publisher: The publisher of the Azure Virtual Machines
Marketplace Image.
For example, Canonical or MicrosoftWindowsServer. Required if
@@ -131,14 +131,14 @@ class AzureBatchOperator(BaseOperator):
use_latest_image_and_sku is set to True
:type sku_starts_with: Optional[str]
- :param timeout: The amount of time to wait for the job to complete in
minutes
- :type timeout: Optional[int]
+ :param timeout: The amount of time to wait for the job to complete in
minutes. Default is 25
+ :type timeout: int
:param should_delete_job: Whether to delete job after execution. Default
is False
- :type should_delete_job: Optional[bool]
+ :type should_delete_job: bool
:param should_delete_pool: Whether to delete pool after execution of jobs.
Default is False
- :type should_delete_pool: Optional[bool]
+ :type should_delete_pool: bool
"""
@@ -162,22 +162,22 @@ class AzureBatchOperator(BaseOperator):
batch_task_display_name: Optional[str] = None,
batch_task_container_settings:
Optional[batch_models.TaskContainerSettings] = None,
batch_start_task: Optional[batch_models.StartTask] = None,
- batch_max_retries: Optional[int] = 3,
+ batch_max_retries: int = 3,
batch_task_resource_files:
Optional[List[batch_models.ResourceFile]] = None,
batch_task_output_files:
Optional[List[batch_models.OutputFile]] = None,
batch_task_user_identity: Optional[batch_models.UserIdentity]
= None,
target_low_priority_nodes: Optional[int] = None,
target_dedicated_nodes: Optional[int] = None,
- enable_auto_scale: Optional[bool] = False,
+ enable_auto_scale: bool = False,
auto_scale_formula: Optional[str] = None,
azure_batch_conn_id='azure_batch_default',
- use_latest_verified_vm_image_and_sku: Optional[bool] = False,
+ use_latest_verified_vm_image_and_sku: bool = False,
vm_publisher: Optional[str] = None,
vm_offer: Optional[str] = None,
sku_starts_with: Optional[str] = None,
- timeout: Optional[int] = 25,
- should_delete_job: Optional[bool] = False,
- should_delete_pool: Optional[bool] = False,
+ timeout: int = 25,
+ should_delete_job: bool = False,
+ should_delete_pool: bool = False,
*args,
**kwargs) -> None:
@@ -213,7 +213,7 @@ class AzureBatchOperator(BaseOperator):
self.should_delete_pool = should_delete_pool
self.hook = self.get_hook()
- def _check_inputs(self):
+ def _check_inputs(self) -> Any:
if self.use_latest_image:
if not all(elem for elem in [self.vm_publisher, self.vm_offer,
self.sku_starts_with]):
@@ -240,7 +240,8 @@ class AzureBatchOperator(BaseOperator):
raise AirflowException("Some required parameters are
missing.Please you must set "
"all the required parameters. ")
- def execute(self, context):
+ def execute(self,
+ context: Dict[Any, Any]) -> None:
self._check_inputs()
self.hook.connection.config.retry_policy = self.batch_max_retries
@@ -305,7 +306,7 @@ class AzureBatchOperator(BaseOperator):
)
self.log.info("Azure Batch job (%s) terminated: %s",
self.batch_job_id, response)
- def get_hook(self):
+ def get_hook(self) -> AzureBatchHook:
"""
Create and return an AzureBatchHook.
@@ -314,7 +315,9 @@ class AzureBatchOperator(BaseOperator):
azure_batch_conn_id=self.azure_batch_conn_id
)
- def clean_up(self, pool_id=None, job_id=None):
+ def clean_up(self,
+ pool_id: Optional[str] = None,
+ job_id: Optional[str] = None) -> None:
"""
Delete the given pool and job in the batch account
diff --git
a/airflow/providers/microsoft/azure/operators/azure_container_instances.py
b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
index 056c4ba..06f2a34 100644
--- a/airflow/providers/microsoft/azure/operators/azure_container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
@@ -19,7 +19,7 @@
import re
from collections import namedtuple
from time import sleep
-from typing import Dict, Sequence
+from typing import Any, Dict, List, Optional, Sequence, Union
from azure.mgmt.containerinstance.models import (
Container, ContainerGroup, EnvironmentVariable, ResourceRequests,
ResourceRequirements, VolumeMount,
@@ -69,10 +69,10 @@ class AzureContainerInstancesOperator(BaseOperator):
:type region: str
:param environment_variables: key,value pairs containing environment
variables which will be passed to the running container
- :type environment_variables: dict
+ :type environment_variables: Optional[dict]
:param secured_variables: names of environmental variables that should not
be exposed outside the container (typically passwords).
- :type secured_variables: [str]
+ :type secured_variables: Optional[str]
:param volumes: list of ``Volume`` tuples to be mounted to the container.
Currently only Azure Fileshares are supported.
:type volumes: list[<conn_id, account_name, share_name, mount_path,
read_only>]
@@ -83,12 +83,12 @@ class AzureContainerInstancesOperator(BaseOperator):
:param gpu: GPU Resource for the container.
:type gpu: azure.mgmt.containerinstance.models.GpuResource
:param command: the command to run inside the container
- :type command: [str]
+ :type command: Optional[str]
:param container_timeout: max time allowed for the execution of
the container instance.
:type container_timeout: datetime.timedelta
:param tags: azure tags as dict of str:str
- :type tags: dict[str, str]
+ :type tags: Optional[dict[str, str]]
**Example**::
@@ -123,24 +123,24 @@ class AzureContainerInstancesOperator(BaseOperator):
# pylint: disable=too-many-arguments
@apply_defaults
def __init__(self,
- ci_conn_id,
- registry_conn_id,
- resource_group,
- name,
- image,
- region,
- environment_variables=None,
- secured_variables=None,
- volumes=None,
- memory_in_gb=None,
- cpu=None,
- gpu=None,
- command=None,
- remove_on_error=True,
- fail_if_exists=True,
- tags=None,
+ ci_conn_id: str,
+ registry_conn_id: Optional[str],
+ resource_group: str,
+ name: str,
+ image: str,
+ region: str,
+ environment_variables: Optional[Dict[Any, Any]] = None,
+ secured_variables: Optional[str] = None,
+ volumes: Optional[List[Any]] = None,
+ memory_in_gb: Optional[Any] = None,
+ cpu: Optional[Any] = None,
+ gpu: Optional[Any] = None,
+ command: Optional[str] = None,
+ remove_on_error: bool = True,
+ fail_if_exists: bool = True,
+ tags: Optional[Dict[str, str]] = None,
*args,
- **kwargs):
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
self.ci_conn_id = ci_conn_id
@@ -158,10 +158,11 @@ class AzureContainerInstancesOperator(BaseOperator):
self.command = command
self.remove_on_error = remove_on_error
self.fail_if_exists = fail_if_exists
- self._ci_hook = None
+ self._ci_hook: Any = None
self.tags = tags
- def execute(self, context):
+ def execute(self,
+ context: Dict[Any, Any]) -> int:
# Check name again in case it was templated.
self._check_name(self.name)
@@ -174,7 +175,7 @@ class AzureContainerInstancesOperator(BaseOperator):
if self.registry_conn_id:
registry_hook = AzureContainerRegistryHook(self.registry_conn_id)
- image_registry_credentials = [registry_hook.connection, ]
+ image_registry_credentials: Optional[List[Any]] =
[registry_hook.connection, ]
else:
image_registry_credentials = None
@@ -186,8 +187,8 @@ class AzureContainerInstancesOperator(BaseOperator):
e = EnvironmentVariable(name=key, value=value)
environment_variables.append(e)
- volumes = []
- volume_mounts = []
+ volumes: List[Union[Volume, Volume]] = []
+ volume_mounts: List[Union[VolumeMount, VolumeMount]] = []
for conn_id, account_name, share_name, mount_path, read_only in
self.volumes:
hook = AzureContainerVolumeHook(conn_id)
@@ -250,7 +251,7 @@ class AzureContainerInstancesOperator(BaseOperator):
if exit_code == 0 or self.remove_on_error:
self.on_kill()
- def on_kill(self):
+ def on_kill(self) -> None:
if self.remove_on_error:
self.log.info("Deleting container group")
try:
@@ -258,7 +259,7 @@ class AzureContainerInstancesOperator(BaseOperator):
except Exception: # pylint: disable=broad-except
self.log.exception("Could not delete container group")
- def _monitor_logging(self, resource_group, name):
+ def _monitor_logging(self, resource_group: str, name: str) -> int:
last_state = None
last_message_logged = None
last_line_logged = None
@@ -318,7 +319,9 @@ class AzureContainerInstancesOperator(BaseOperator):
sleep(1)
- def _log_last(self, logs, last_line_logged):
+ def _log_last(self,
+ logs: Optional[List[Any]],
+ last_line_logged: Any) -> Optional[Any]:
if logs:
# determine the last line which was logged before
last_line_index = 0
@@ -336,7 +339,7 @@ class AzureContainerInstancesOperator(BaseOperator):
return None
@staticmethod
- def _check_name(name):
+ def _check_name(name: str) -> str:
if '{{' in name:
# Let macros pass as they cannot be checked at construction time
return name
diff --git a/airflow/providers/microsoft/azure/operators/azure_cosmos.py
b/airflow/providers/microsoft/azure/operators/azure_cosmos.py
index c46aae0..10bfc24 100644
--- a/airflow/providers/microsoft/azure/operators/azure_cosmos.py
+++ b/airflow/providers/microsoft/azure/operators/azure_cosmos.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import Any, Dict
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.azure_cosmos import
AzureCosmosDBHook
@@ -40,19 +41,19 @@ class AzureCosmosInsertDocumentOperator(BaseOperator):
@apply_defaults
def __init__(self,
- database_name,
- collection_name,
- document,
- azure_cosmos_conn_id='azure_cosmos_default',
+ database_name: str,
+ collection_name: str,
+ document: dict,
+ azure_cosmos_conn_id: str = 'azure_cosmos_default',
*args,
- **kwargs):
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
self.database_name = database_name
self.collection_name = collection_name
self.document = document
self.azure_cosmos_conn_id = azure_cosmos_conn_id
- def execute(self, context):
+ def execute(self, context: Dict[Any, Any]) -> None:
# Create the hook
hook =
AzureCosmosDBHook(azure_cosmos_conn_id=self.azure_cosmos_conn_id)
diff --git a/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
b/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
index 79bc141..b669e63 100644
--- a/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
+++ b/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
@@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
#
+from typing import Any, Dict
+
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.utils.decorators import apply_defaults
@@ -43,11 +45,15 @@ class WasbDeleteBlobOperator(BaseOperator):
template_fields = ('container_name', 'blob_name')
@apply_defaults
- def __init__(self, container_name, blob_name,
- wasb_conn_id='wasb_default', check_options=None,
- is_prefix=False, ignore_if_missing=False,
+ def __init__(self,
+ container_name: str,
+ blob_name: str,
+ wasb_conn_id: str = 'wasb_default',
+ check_options: Any = None,
+ is_prefix: bool = False,
+ ignore_if_missing: bool = False,
*args,
- **kwargs):
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
if check_options is None:
check_options = {}
@@ -58,7 +64,7 @@ class WasbDeleteBlobOperator(BaseOperator):
self.is_prefix = is_prefix
self.ignore_if_missing = ignore_if_missing
- def execute(self, context):
+ def execute(self, context: Dict[Any, Any]) -> None:
self.log.info(
'Deleting blob: %s\nin wasb://%s', self.blob_name,
self.container_name
)
diff --git a/airflow/providers/microsoft/azure/sensors/azure_cosmos.py
b/airflow/providers/microsoft/azure/sensors/azure_cosmos.py
index 87ed643..d7f2616 100644
--- a/airflow/providers/microsoft/azure/sensors/azure_cosmos.py
+++ b/airflow/providers/microsoft/azure/sensors/azure_cosmos.py
@@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import Any, Dict
+
from airflow.providers.microsoft.azure.hooks.azure_cosmos import
AzureCosmosDBHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -45,19 +47,19 @@ class AzureCosmosDocumentSensor(BaseSensorOperator):
@apply_defaults
def __init__(
self,
- database_name,
- collection_name,
- document_id,
- azure_cosmos_conn_id="azure_cosmos_default",
+ database_name: str,
+ collection_name: str,
+ document_id: str,
+ azure_cosmos_conn_id: str = "azure_cosmos_default",
*args,
- **kwargs):
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
self.azure_cosmos_conn_id = azure_cosmos_conn_id
self.database_name = database_name
self.collection_name = collection_name
self.document_id = document_id
- def poke(self, context):
+ def poke(self, context: Dict[Any, Any]) -> bool:
self.log.info("*** Intering poke")
hook = AzureCosmosDBHook(self.azure_cosmos_conn_id)
return hook.get_document(self.document_id, self.database_name,
self.collection_name) is not None
diff --git a/airflow/providers/microsoft/azure/sensors/wasb.py
b/airflow/providers/microsoft/azure/sensors/wasb.py
index 9708ee1..f689ed0 100644
--- a/airflow/providers/microsoft/azure/sensors/wasb.py
+++ b/airflow/providers/microsoft/azure/sensors/wasb.py
@@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
#
+from typing import Any, Dict, Optional
+
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -39,8 +41,12 @@ class WasbBlobSensor(BaseSensorOperator):
template_fields = ('container_name', 'blob_name')
@apply_defaults
- def __init__(self, container_name, blob_name,
- wasb_conn_id='wasb_default', check_options=None, *args,
+ def __init__(self,
+ container_name: str,
+ blob_name: str,
+ wasb_conn_id: str = 'wasb_default',
+ check_options: Optional[dict] = None,
+ *args,
**kwargs):
super().__init__(*args, **kwargs)
if check_options is None:
@@ -50,7 +56,7 @@ class WasbBlobSensor(BaseSensorOperator):
self.blob_name = blob_name
self.check_options = check_options
- def poke(self, context):
+ def poke(self, context: Dict[Any, Any]):
self.log.info(
'Poking for blob: %s\nin wasb://%s', self.blob_name,
self.container_name
)
@@ -77,8 +83,13 @@ class WasbPrefixSensor(BaseSensorOperator):
template_fields = ('container_name', 'prefix')
@apply_defaults
- def __init__(self, container_name, prefix, wasb_conn_id='wasb_default',
- check_options=None, *args, **kwargs):
+ def __init__(self,
+ container_name: str,
+ prefix: str,
+ wasb_conn_id: str = 'wasb_default',
+ check_options: Optional[dict] = None,
+ *args,
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
if check_options is None:
check_options = {}
@@ -87,7 +98,8 @@ class WasbPrefixSensor(BaseSensorOperator):
self.prefix = prefix
self.check_options = check_options
- def poke(self, context):
+ def poke(self,
+ context: Dict[Any, Any]) -> bool:
self.log.info('Poking for prefix: %s in wasb://%s', self.prefix,
self.container_name)
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
return hook.check_for_prefix(self.container_name, self.prefix,
diff --git a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py
b/airflow/providers/microsoft/azure/transfers/file_to_wasb.py
index b43017f..64cdbc0 100644
--- a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py
+++ b/airflow/providers/microsoft/azure/transfers/file_to_wasb.py
@@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
#
+from typing import Any, Dict, Optional
+
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.utils.decorators import apply_defaults
@@ -35,14 +37,19 @@ class FileToWasbOperator(BaseOperator):
:type wasb_conn_id: str
:param load_options: Optional keyword arguments that
`WasbHook.load_file()` takes.
- :type load_options: dict
+ :type load_options: Optional[dict]
"""
template_fields = ('file_path', 'container_name', 'blob_name')
@apply_defaults
- def __init__(self, file_path, container_name, blob_name,
- wasb_conn_id='wasb_default', load_options=None, *args,
- **kwargs):
+ def __init__(self,
+ file_path: str,
+ container_name: str,
+ blob_name: str,
+ wasb_conn_id: str = 'wasb_default',
+ load_options: Optional[dict] = None,
+ *args,
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
if load_options is None:
load_options = {}
@@ -52,7 +59,8 @@ class FileToWasbOperator(BaseOperator):
self.wasb_conn_id = wasb_conn_id
self.load_options = load_options
- def execute(self, context):
+ def execute(self,
+ context: Dict[Any, Any]) -> None:
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
self.log.info(
diff --git
a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
index 11013d5..65c128c 100644
--- a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
+++ b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
@@ -18,6 +18,7 @@
import os
from tempfile import TemporaryDirectory
+from typing import Any, Dict, Optional, Union
import unicodecsv as csv
@@ -44,7 +45,7 @@ class OracleToAzureDataLakeOperator(BaseOperator):
:param sql: SQL query to execute against the Oracle database. (templated)
:type sql: str
:param sql_params: Parameters to use in sql query. (templated)
- :type sql_params: str
+ :type sql_params: Optional[dict]
:param delimiter: field delimiter in the file.
:type delimiter: str
:param encoding: encoding type for the file.
@@ -62,17 +63,18 @@ class OracleToAzureDataLakeOperator(BaseOperator):
@apply_defaults
def __init__(
self,
- filename,
- azure_data_lake_conn_id,
- azure_data_lake_path,
- oracle_conn_id,
- sql,
- sql_params=None,
- delimiter=",",
- encoding="utf-8",
- quotechar='"',
- quoting=csv.QUOTE_MINIMAL,
- *args, **kwargs):
+ filename: str,
+ azure_data_lake_conn_id: str,
+ azure_data_lake_path: str,
+ oracle_conn_id: str,
+ sql: str,
+ sql_params: Optional[dict] = None,
+ delimiter: str = ",",
+ encoding: str = "utf-8",
+ quotechar: str = '"',
+ quoting: str = csv.QUOTE_MINIMAL,
+ *args,
+ **kwargs) -> None:
super().__init__(*args, **kwargs)
if sql_params is None:
sql_params = {}
@@ -87,7 +89,9 @@ class OracleToAzureDataLakeOperator(BaseOperator):
self.quotechar = quotechar
self.quoting = quoting
- def _write_temp_file(self, cursor, path_to_save):
+ def _write_temp_file(self,
+ cursor: Any,
+ path_to_save: Union[str, bytes, int]) -> None:
with open(path_to_save, 'wb') as csvfile:
csv_writer = csv.writer(csvfile, delimiter=self.delimiter,
encoding=self.encoding,
quotechar=self.quotechar,
@@ -96,7 +100,8 @@ class OracleToAzureDataLakeOperator(BaseOperator):
csv_writer.writerows(cursor)
csvfile.flush()
- def execute(self, context):
+ def execute(self,
+ context: Dict[Any, Any]) -> None:
oracle_hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
azure_data_lake_hook = AzureDataLakeHook(
azure_data_lake_conn_id=self.azure_data_lake_conn_id)