This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 99f320354b Refactor: consolidate import time in providers (#34402)
99f320354b is described below
commit 99f320354b075fb780e54057d223d2d16ddf08b8
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Wed Oct 4 14:11:56 2023 +0000
Refactor: consolidate import time in providers (#34402)
---
.../providers/alibaba/cloud/operators/analyticdb_spark.py | 4 ++--
airflow/providers/amazon/aws/hooks/batch_client.py | 4 ++--
.../amazon/aws/hooks/elasticache_replication_group.py | 6 +++---
airflow/providers/amazon/aws/hooks/emr.py | 4 ++--
airflow/providers/amazon/aws/hooks/redshift_data.py | 4 ++--
airflow/providers/amazon/aws/hooks/s3.py | 4 ++--
airflow/providers/amazon/aws/operators/appflow.py | 4 ++--
airflow/providers/apache/livy/operators/livy.py | 4 ++--
airflow/providers/elasticsearch/log/es_task_handler.py | 4 ++--
airflow/providers/google/cloud/hooks/cloud_batch.py | 4 ++--
airflow/providers/google/cloud/hooks/datafusion.py | 12 ++++++------
airflow/providers/google/cloud/operators/datafusion.py | 4 ++--
airflow/providers/google/cloud/operators/dataplex.py | 6 +++---
.../providers/google/cloud/operators/dataproc_metastore.py | 6 +++---
.../microsoft/azure/operators/container_instances.py | 4 ++--
tests/providers/amazon/aws/hooks/test_batch_client.py | 8 ++++----
.../azure/operators/test_azure_container_instances.py | 2 +-
tests/system/providers/amazon/aws/utils/__init__.py | 4 ++--
18 files changed, 44 insertions(+), 44 deletions(-)
diff --git a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
index 79834dbe65..106f7583cb 100644
--- a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
+++ b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
@@ -17,8 +17,8 @@
# under the License.
from __future__ import annotations
+import time
from functools import cached_property
-from time import sleep
from typing import TYPE_CHECKING, Any, Sequence
from deprecated.classic import deprecated
@@ -78,7 +78,7 @@ class AnalyticDBSparkBaseOperator(BaseOperator):
state = self.hook.get_spark_state(app_id)
while AppState(state) not in AnalyticDBSparkHook.TERMINAL_STATES:
self.log.debug("Application with id %s is in state: %s", app_id,
state)
- sleep(self.polling_interval)
+ time.sleep(self.polling_interval)
state = self.hook.get_spark_state(app_id)
self.log.info("Application with id %s terminated with state: %s",
app_id, state)
self.log.info(
diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py
b/airflow/providers/amazon/aws/hooks/batch_client.py
index a7c973bf1e..af9d79c1ae 100644
--- a/airflow/providers/amazon/aws/hooks/batch_client.py
+++ b/airflow/providers/amazon/aws/hooks/batch_client.py
@@ -28,7 +28,7 @@ from __future__ import annotations
import itertools
import random
-from time import sleep
+import time
from typing import TYPE_CHECKING, Callable
import botocore.client
@@ -549,7 +549,7 @@ class BatchClientHook(AwsBaseHook):
delay = random.uniform(BatchClientHook.DEFAULT_DELAY_MIN,
BatchClientHook.DEFAULT_DELAY_MAX)
else:
delay = BatchClientHook.add_jitter(delay)
- sleep(delay)
+ time.sleep(delay)
@staticmethod
def exponential_delay(tries: int) -> float:
diff --git
a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
index 5dd227d7ee..d37f0babba 100644
--- a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
+++ b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from time import sleep
+import time
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
@@ -160,7 +160,7 @@ class ElastiCacheReplicationGroupHook(AwsBaseHook):
self.log.info("Poke retry %s. Sleep time %s seconds.
Sleeping...", num_tries, sleep_time)
- sleep(sleep_time)
+ time.sleep(sleep_time)
sleep_time *= exponential_back_off_factor
@@ -240,7 +240,7 @@ class ElastiCacheReplicationGroupHook(AwsBaseHook):
self.log.info("Poke retry %s. Sleep time %s seconds.
Sleeping...", num_tries, sleep_time)
- sleep(sleep_time)
+ time.sleep(sleep_time)
sleep_time *= exponential_back_off_factor
diff --git a/airflow/providers/amazon/aws/hooks/emr.py
b/airflow/providers/amazon/aws/hooks/emr.py
index 1ff2129277..17f85cc8a5 100644
--- a/airflow/providers/amazon/aws/hooks/emr.py
+++ b/airflow/providers/amazon/aws/hooks/emr.py
@@ -18,8 +18,8 @@
from __future__ import annotations
import json
+import time
import warnings
-from time import sleep
from typing import Any
from botocore.exceptions import ClientError
@@ -509,7 +509,7 @@ class EmrContainerHook(AwsBaseHook):
final_query_state = query_state
break
try_number += 1
- sleep(poll_interval)
+ time.sleep(poll_interval)
return final_query_state
def stop_query(self, job_id: str) -> dict:
diff --git a/airflow/providers/amazon/aws/hooks/redshift_data.py
b/airflow/providers/amazon/aws/hooks/redshift_data.py
index 110ef9dad1..f7df0fd744 100644
--- a/airflow/providers/amazon/aws/hooks/redshift_data.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_data.py
@@ -17,8 +17,8 @@
# under the License.
from __future__ import annotations
+import time
from pprint import pformat
-from time import sleep
from typing import TYPE_CHECKING, Any, Iterable
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
@@ -127,7 +127,7 @@ class
RedshiftDataHook(AwsGenericHook["RedshiftDataAPIServiceClient"]):
)
else:
self.log.info("Query %s", status)
- sleep(poll_interval)
+ time.sleep(poll_interval)
def get_table_primary_key(
self,
diff --git a/airflow/providers/amazon/aws/hooks/s3.py
b/airflow/providers/amazon/aws/hooks/s3.py
index 9eeeaf876b..f36c671c95 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -26,6 +26,7 @@ import logging
import os
import re
import shutil
+import time
import warnings
from contextlib import suppress
from copy import deepcopy
@@ -35,7 +36,6 @@ from inspect import signature
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile, gettempdir
-from time import sleep
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
from urllib.parse import urlsplit
from uuid import uuid4
@@ -1289,7 +1289,7 @@ class S3Hook(AwsBaseHook):
if not bucket_keys:
break
if retry: # Avoid first loop
- sleep(500)
+ time.sleep(500)
self.delete_objects(bucket=bucket_name, keys=bucket_keys)
diff --git a/airflow/providers/amazon/aws/operators/appflow.py
b/airflow/providers/amazon/aws/operators/appflow.py
index 7b7402acde..184fc7fab1 100644
--- a/airflow/providers/amazon/aws/operators/appflow.py
+++ b/airflow/providers/amazon/aws/operators/appflow.py
@@ -16,10 +16,10 @@
# under the License.
from __future__ import annotations
+import time
import warnings
from datetime import datetime, timedelta
from functools import cached_property
-from time import sleep
from typing import TYPE_CHECKING, cast
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
@@ -107,7 +107,7 @@ class AppflowBaseOperator(BaseOperator):
self._update_flow()
# while schedule flows will pick up the update right away,
on-demand flows might use out of date
# info if triggered right after an update, so we need to wait a
bit for the DB to be consistent.
- sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME)
+ time.sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME)
self._run_flow(context)
diff --git a/airflow/providers/apache/livy/operators/livy.py
b/airflow/providers/apache/livy/operators/livy.py
index bcf36b50ca..8f4fa75527 100644
--- a/airflow/providers/apache/livy/operators/livy.py
+++ b/airflow/providers/apache/livy/operators/livy.py
@@ -17,8 +17,8 @@
"""This module contains the Apache Livy operator."""
from __future__ import annotations
+import time
from functools import cached_property
-from time import sleep
from typing import TYPE_CHECKING, Any, Sequence
from deprecated.classic import deprecated
@@ -189,7 +189,7 @@ class LivyOperator(BaseOperator):
state = self.hook.get_batch_state(batch_id, retry_args=self.retry_args)
while state not in self.hook.TERMINAL_STATES:
self.log.debug("Batch with id %s is in state: %s", batch_id,
state.value)
- sleep(self._polling_interval)
+ time.sleep(self._polling_interval)
state = self.hook.get_batch_state(batch_id,
retry_args=self.retry_args)
self.log.info("Batch with id %s terminated with state: %s", batch_id,
state.value)
self.hook.dump_batch_logs(batch_id)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index e227ca85ca..070d81543f 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -21,10 +21,10 @@ import contextlib
import inspect
import logging
import sys
+import time
import warnings
from collections import defaultdict
from operator import attrgetter
-from time import time
from typing import TYPE_CHECKING, Any, Callable, List, Tuple
from urllib.parse import quote, urlparse
@@ -372,7 +372,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
def emit(self, record):
if self.handler:
- setattr(record, self.offset_field, int(time() * (10**9)))
+ setattr(record, self.offset_field, int(time.time() * (10**9)))
self.handler.emit(record)
def set_context(self, ti: TaskInstance) -> None:
diff --git a/airflow/providers/google/cloud/hooks/cloud_batch.py
b/airflow/providers/google/cloud/hooks/cloud_batch.py
index ef642e4ec9..5ef56e4427 100644
--- a/airflow/providers/google/cloud/hooks/cloud_batch.py
+++ b/airflow/providers/google/cloud/hooks/cloud_batch.py
@@ -19,7 +19,7 @@ from __future__ import annotations
import itertools
import json
-from time import sleep
+import time
from typing import TYPE_CHECKING, Iterable, Sequence
from google.cloud.batch import ListJobsRequest, ListTasksRequest
@@ -152,7 +152,7 @@ class CloudBatchHook(GoogleBaseHook):
):
return job
else:
- sleep(polling_period_seconds)
+ time.sleep(polling_period_seconds)
except Exception as e:
self.log.exception("Exception occurred while checking for job
completion.")
raise e
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py
b/airflow/providers/google/cloud/hooks/datafusion.py
index b3cfd6ee9b..3c8d1b7453 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import asyncio
import json
import os
-from time import monotonic, sleep
+import time
from typing import Any, Dict, Sequence
from urllib.parse import quote, urlencode, urljoin
@@ -91,7 +91,7 @@ class DataFusionHook(GoogleBaseHook):
def wait_for_operation(self, operation: dict[str, Any]) -> dict[str, Any]:
"""Waits for long-lasting operation to complete."""
for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
- sleep(time_to_wait)
+ time.sleep(time_to_wait)
operation = (
self.get_conn().projects().locations().operations().get(name=operation.get("name")).execute()
)
@@ -115,9 +115,9 @@ class DataFusionHook(GoogleBaseHook):
"""Polls pipeline state and raises an exception if the state fails or
times out."""
failure_states = failure_states or FAILURE_STATES
success_states = success_states or SUCCESS_STATES
- start_time = monotonic()
+ start_time = time.monotonic()
current_state = None
- while monotonic() - start_time < timeout:
+ while time.monotonic() - start_time < timeout:
try:
workflow = self.get_pipeline_workflow(
pipeline_name=pipeline_name,
@@ -135,7 +135,7 @@ class DataFusionHook(GoogleBaseHook):
raise AirflowException(
f"Pipeline {pipeline_name} state {current_state} is not
one of {success_states}"
)
- sleep(30)
+ time.sleep(30)
# Time is up!
raise AirflowException(
@@ -393,7 +393,7 @@ class DataFusionHook(GoogleBaseHook):
)
except ConflictException as exc:
self.log.info(exc)
- sleep(time_to_wait)
+ time.sleep(time_to_wait)
else:
if response.status == 200:
break
diff --git a/airflow/providers/google/cloud/operators/datafusion.py
b/airflow/providers/google/cloud/operators/datafusion.py
index e5d0dec5df..b2149495f9 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -17,7 +17,7 @@
"""This module contains Google DataFusion operators."""
from __future__ import annotations
-from time import sleep
+import time
from typing import TYPE_CHECKING, Any, Sequence
from google.api_core.retry import exponential_sleep_generator
@@ -267,7 +267,7 @@ class
CloudDataFusionCreateInstanceOperator(GoogleCloudBaseOperator):
for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
if instance["state"] != "CREATING":
break
- sleep(time_to_wait)
+ time.sleep(time_to_wait)
instance = hook.get_instance(
instance_name=self.instance_name, location=self.location,
project_id=self.project_id
)
diff --git a/airflow/providers/google/cloud/operators/dataplex.py
b/airflow/providers/google/cloud/operators/dataplex.py
index 3a85961d2d..4fdd9ab970 100644
--- a/airflow/providers/google/cloud/operators/dataplex.py
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -18,7 +18,7 @@
from __future__ import annotations
-from time import sleep
+import time
from typing import TYPE_CHECKING, Any, Sequence
from airflow.exceptions import AirflowException
@@ -165,7 +165,7 @@ class DataplexCreateTaskOperator(GoogleCloudBaseOperator):
)
if task["state"] != "CREATING":
break
- sleep(time_to_wait)
+ time.sleep(time_to_wait)
return Task.to_dict(task)
@@ -534,7 +534,7 @@ class DataplexCreateLakeOperator(GoogleCloudBaseOperator):
)
if lake["state"] != "CREATING":
break
- sleep(time_to_wait)
+ time.sleep(time_to_wait)
DataplexLakeLink.persist(
context=context,
task_instance=self,
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py
b/airflow/providers/google/cloud/operators/dataproc_metastore.py
index c27c5dc314..af88728fb6 100644
--- a/airflow/providers/google/cloud/operators/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -18,7 +18,7 @@
"""This module contains Google Dataproc Metastore operators."""
from __future__ import annotations
-from time import sleep
+import time
from typing import TYPE_CHECKING, Sequence
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
@@ -700,7 +700,7 @@ class
DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):
the SDK.
"""
for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
- sleep(time_to_wait)
+ time.sleep(time_to_wait)
service = hook.get_service(
region=self.region,
project_id=self.project_id,
@@ -986,7 +986,7 @@ class
DataprocMetastoreRestoreServiceOperator(GoogleCloudBaseOperator):
the SDK.
"""
for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
- sleep(time_to_wait)
+ time.sleep(time_to_wait)
service = hook.get_service(
region=self.region,
project_id=self.project_id,
diff --git a/airflow/providers/microsoft/azure/operators/container_instances.py
b/airflow/providers/microsoft/azure/operators/container_instances.py
index 73c082003e..c104995aff 100644
--- a/airflow/providers/microsoft/azure/operators/container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/container_instances.py
@@ -18,8 +18,8 @@
from __future__ import annotations
import re
+import time
from collections import namedtuple
-from time import sleep
from typing import TYPE_CHECKING, Any, Sequence
from azure.mgmt.containerinstance.models import (
@@ -348,7 +348,7 @@ class AzureContainerInstancesOperator(BaseOperator):
except Exception:
self.log.exception("Exception while getting container groups")
- sleep(1)
+ time.sleep(1)
def _log_last(self, logs: list | None, last_line_logged: Any) -> Any |
None:
if logs:
diff --git a/tests/providers/amazon/aws/hooks/test_batch_client.py
b/tests/providers/amazon/aws/hooks/test_batch_client.py
index e501da3c06..39dde3f141 100644
--- a/tests/providers/amazon/aws/hooks/test_batch_client.py
+++ b/tests/providers/amazon/aws/hooks/test_batch_client.py
@@ -427,7 +427,7 @@ class TestBatchClientDelays:
assert result <= width
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
- @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+ @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_defaults(self, mock_sleep, mock_uniform):
assert BatchClientHook.DEFAULT_DELAY_MIN == 1
assert BatchClientHook.DEFAULT_DELAY_MAX == 10
@@ -439,21 +439,21 @@ class TestBatchClientDelays:
mock_sleep.assert_called_once_with(0)
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
- @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+ @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_with_zero(self, mock_sleep, mock_uniform):
self.batch_client.delay(0)
mock_uniform.assert_called_once_with(0, 1) # in add_jitter
mock_sleep.assert_called_once_with(mock_uniform.return_value)
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
- @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+ @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_with_int(self, mock_sleep, mock_uniform):
self.batch_client.delay(5)
mock_uniform.assert_called_once_with(4, 6) # in add_jitter
mock_sleep.assert_called_once_with(mock_uniform.return_value)
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
- @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
+ @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_with_float(self, mock_sleep, mock_uniform):
self.batch_client.delay(5.0)
mock_uniform.assert_called_once_with(4.0, 6.0) # in add_jitter
diff --git
a/tests/providers/microsoft/azure/operators/test_azure_container_instances.py
b/tests/providers/microsoft/azure/operators/test_azure_container_instances.py
index 2984fe60ee..35fdc2b8b9 100644
---
a/tests/providers/microsoft/azure/operators/test_azure_container_instances.py
+++
b/tests/providers/microsoft/azure/operators/test_azure_container_instances.py
@@ -345,7 +345,7 @@ class TestACIOperator:
)
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
-
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.sleep")
+
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.time.sleep")
def test_execute_correct_sleep_cycle(self, sleep_mock, aci_mock):
expected_cg1 = make_mock_container(state="Running", exit_code=0,
detail_status="test")
expected_cg2 = make_mock_container(state="Terminated", exit_code=0,
detail_status="test")
diff --git a/tests/system/providers/amazon/aws/utils/__init__.py
b/tests/system/providers/amazon/aws/utils/__init__.py
index 6515fe6552..b521e3dedd 100644
--- a/tests/system/providers/amazon/aws/utils/__init__.py
+++ b/tests/system/providers/amazon/aws/utils/__init__.py
@@ -20,8 +20,8 @@ import inspect
import json
import logging
import os
+import time
from pathlib import Path
-from time import sleep
from typing import TYPE_CHECKING
from uuid import uuid4
@@ -331,7 +331,7 @@ def _purge_logs(
if not retry or retry_times == 0 or e.response["Error"]["Code"] !=
"ResourceNotFoundException":
raise e
- sleep(PURGE_LOGS_INTERVAL_PERIOD)
+ time.sleep(PURGE_LOGS_INTERVAL_PERIOD)
_purge_logs(
test_logs=test_logs,
force_delete=force_delete,