This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 78cd521 Allow fractional seconds for timeout values (#11966)
78cd521 is described below
commit 78cd521435743b878d12eba67fde65ad8b5575d8
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Oct 30 15:45:45 2020 +0000
Allow fractional seconds for timeout values (#11966)
Python 3.3 added the `sigitimer()` function, which unlike `alarm`,
allows fractional seconds to be specified (it still raises a SIGALRM
when the timeout expires)
---
airflow/config_templates/config.yml | 8 ++++----
airflow/config_templates/default_airflow.cfg | 4 ++--
airflow/executors/celery_executor.py | 2 +-
airflow/models/dagbag.py | 2 +-
airflow/models/taskinstance.py | 2 +-
airflow/sensors/smart_sensor_operator.py | 6 +++---
airflow/utils/timeout.py | 4 ++--
7 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 8ffa206..3f69e31 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -237,9 +237,9 @@
description: |
How long before timing out a python file import
version_added: ~
- type: string
+ type: float
example: ~
- default: "30"
+ default: "30.0"
- name: dagbag_import_error_tracebacks
description: |
Should a traceback be shown in the UI for dagbag import errors,
@@ -1413,9 +1413,9 @@
The number of seconds to wait before timing out
``send_task_to_executor`` or
``fetch_celery_task_state`` operations.
version_added: 1.10.8
- type: int
+ type: float
example: ~
- default: "2"
+ default: "2.0"
- name: task_track_started
description: |
Celery task will report its status as 'started' when the task is
executed by a worker.
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 610e663..3585592 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -146,7 +146,7 @@ fernet_key = {FERNET_KEY}
donot_pickle = True
# How long before timing out a python file import
-dagbag_import_timeout = 30
+dagbag_import_timeout = 30.0
# Should a traceback be shown in the UI for dagbag import errors,
# instead of just the exception message
@@ -705,7 +705,7 @@ pool = prefork
# The number of seconds to wait before timing out ``send_task_to_executor`` or
# ``fetch_celery_task_state`` operations.
-operation_timeout = 2
+operation_timeout = 2.0
# Celery task will report its status as 'started' when the task is executed by
a worker.
# This is used in Airflow to keep track of the running tasks and if a
Scheduler is restarted
diff --git a/airflow/executors/celery_executor.py
b/airflow/executors/celery_executor.py
index e5e5633..9872c5f 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -59,7 +59,7 @@ CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task
state'
CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task'
-OPERATION_TIMEOUT = conf.getint('celery', 'operation_timeout', fallback=2)
+OPERATION_TIMEOUT = conf.getfloat('celery', 'operation_timeout', fallback=2.0)
'''
To start the celery worker, run the command:
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 2018dd5..3a1df4e 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -80,7 +80,7 @@ class DagBag(BaseDagBag, LoggingMixin):
:type read_dags_from_db: bool
"""
- DAGBAG_IMPORT_TIMEOUT = conf.getint('core', 'DAGBAG_IMPORT_TIMEOUT')
+ DAGBAG_IMPORT_TIMEOUT = conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler',
'scheduler_zombie_task_threshold')
def __init__(
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6731609..83d8f0e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1231,7 +1231,7 @@ class TaskInstance(Base, LoggingMixin): # pylint:
disable=R0902,R0904
# if it goes beyond
if task_copy.execution_timeout:
try:
- with timeout(int(task_copy.execution_timeout.total_seconds())):
+ with timeout(task_copy.execution_timeout.total_seconds()):
result = task_copy.execute(context=context)
except AirflowTaskTimeout:
task_copy.on_kill()
diff --git a/airflow/sensors/smart_sensor_operator.py
b/airflow/sensors/smart_sensor_operator.py
index d6b5076..a461973 100644
--- a/airflow/sensors/smart_sensor_operator.py
+++ b/airflow/sensors/smart_sensor_operator.py
@@ -290,7 +290,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
:type poke_interval: int
:param smart_sensor_timeout: Time, in seconds before the internal sensor
job times out if poke_timeout is not defined.
- :type smart_sensor_timeout: int
+ :type smart_sensor_timeout: float
:param shard_min: shard code lower bound (inclusive)
:type shard_min: int
:param shard_max: shard code upper bound (exclusive)
@@ -299,7 +299,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
exception expires and being cleaned up.
:type poke_exception_cache_ttl: int
:param poke_timeout: Time, in seconds before the task times out and fails.
- :type poke_timeout: int
+ :type poke_timeout: float
"""
ui_color = '#e6f1f2'
@@ -312,7 +312,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
shard_min=0,
shard_max=100000,
poke_exception_cache_ttl=600,
- poke_timeout=6,
+ poke_timeout=6.0,
*args,
**kwargs):
super().__init__(*args, **kwargs)
diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py
index bd88fde..8c5b249 100644
--- a/airflow/utils/timeout.py
+++ b/airflow/utils/timeout.py
@@ -39,14 +39,14 @@ class timeout(LoggingMixin): # pylint: disable=invalid-name
def __enter__(self):
try:
signal.signal(signal.SIGALRM, self.handle_timeout)
- signal.alarm(self.seconds)
+ signal.setitimer(signal.ITIMER_REAL, self.seconds)
except ValueError as e:
self.log.warning("timeout can't be used in the current context")
self.log.exception(e)
def __exit__(self, type_, value, traceback):
try:
- signal.alarm(0)
+ signal.setitimer(signal.ITIMER_REAL, 0)
except ValueError as e:
self.log.warning("timeout can't be used in the current context")
self.log.exception(e)