This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 78cd521  Allow fractional seconds for timeout values (#11966)
78cd521 is described below

commit 78cd521435743b878d12eba67fde65ad8b5575d8
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Oct 30 15:45:45 2020 +0000

    Allow fractional seconds for timeout values (#11966)
    
    Python 3.3 added the `sigitimer()` function, which unlike `alarm`,
    allows fractional seconds to be specified (it still raises a SIGALRM
    when the timeout expires)
---
 airflow/config_templates/config.yml          | 8 ++++----
 airflow/config_templates/default_airflow.cfg | 4 ++--
 airflow/executors/celery_executor.py         | 2 +-
 airflow/models/dagbag.py                     | 2 +-
 airflow/models/taskinstance.py               | 2 +-
 airflow/sensors/smart_sensor_operator.py     | 6 +++---
 airflow/utils/timeout.py                     | 4 ++--
 7 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 8ffa206..3f69e31 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -237,9 +237,9 @@
       description: |
         How long before timing out a python file import
       version_added: ~
-      type: string
+      type: float
       example: ~
-      default: "30"
+      default: "30.0"
     - name: dagbag_import_error_tracebacks
       description: |
         Should a traceback be shown in the UI for dagbag import errors,
@@ -1413,9 +1413,9 @@
         The number of seconds to wait before timing out 
``send_task_to_executor`` or
         ``fetch_celery_task_state`` operations.
       version_added: 1.10.8
-      type: int
+      type: float
       example: ~
-      default: "2"
+      default: "2.0"
     - name: task_track_started
       description: |
         Celery task will report its status as 'started' when the task is 
executed by a worker.
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 610e663..3585592 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -146,7 +146,7 @@ fernet_key = {FERNET_KEY}
 donot_pickle = True
 
 # How long before timing out a python file import
-dagbag_import_timeout = 30
+dagbag_import_timeout = 30.0
 
 # Should a traceback be shown in the UI for dagbag import errors,
 # instead of just the exception message
@@ -705,7 +705,7 @@ pool = prefork
 
 # The number of seconds to wait before timing out ``send_task_to_executor`` or
 # ``fetch_celery_task_state`` operations.
-operation_timeout = 2
+operation_timeout = 2.0
 
 # Celery task will report its status as 'started' when the task is executed by 
a worker.
 # This is used in Airflow to keep track of the running tasks and if a 
Scheduler is restarted
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index e5e5633..9872c5f 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -59,7 +59,7 @@ CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task 
state'
 
 CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task'
 
-OPERATION_TIMEOUT = conf.getint('celery', 'operation_timeout', fallback=2)
+OPERATION_TIMEOUT = conf.getfloat('celery', 'operation_timeout', fallback=2.0)
 
 '''
 To start the celery worker, run the command:
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 2018dd5..3a1df4e 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -80,7 +80,7 @@ class DagBag(BaseDagBag, LoggingMixin):
     :type read_dags_from_db: bool
     """
 
-    DAGBAG_IMPORT_TIMEOUT = conf.getint('core', 'DAGBAG_IMPORT_TIMEOUT')
+    DAGBAG_IMPORT_TIMEOUT = conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
     SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler', 
'scheduler_zombie_task_threshold')
 
     def __init__(
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6731609..83d8f0e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1231,7 +1231,7 @@ class TaskInstance(Base, LoggingMixin):     # pylint: 
disable=R0902,R0904
         # if it goes beyond
         if task_copy.execution_timeout:
             try:
-                with timeout(int(task_copy.execution_timeout.total_seconds())):
+                with timeout(task_copy.execution_timeout.total_seconds()):
                     result = task_copy.execute(context=context)
             except AirflowTaskTimeout:
                 task_copy.on_kill()
diff --git a/airflow/sensors/smart_sensor_operator.py 
b/airflow/sensors/smart_sensor_operator.py
index d6b5076..a461973 100644
--- a/airflow/sensors/smart_sensor_operator.py
+++ b/airflow/sensors/smart_sensor_operator.py
@@ -290,7 +290,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
     :type poke_interval: int
     :param smart_sensor_timeout: Time, in seconds before the internal sensor
         job times out if poke_timeout is not defined.
-    :type smart_sensor_timeout: int
+    :type smart_sensor_timeout: float
     :param shard_min: shard code lower bound (inclusive)
     :type shard_min: int
     :param shard_max: shard code upper bound (exclusive)
@@ -299,7 +299,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
         exception expires and being cleaned up.
     :type poke_exception_cache_ttl: int
     :param poke_timeout: Time, in seconds before the task times out and fails.
-    :type poke_timeout: int
+    :type poke_timeout: float
     """
 
     ui_color = '#e6f1f2'
@@ -312,7 +312,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
                  shard_min=0,
                  shard_max=100000,
                  poke_exception_cache_ttl=600,
-                 poke_timeout=6,
+                 poke_timeout=6.0,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py
index bd88fde..8c5b249 100644
--- a/airflow/utils/timeout.py
+++ b/airflow/utils/timeout.py
@@ -39,14 +39,14 @@ class timeout(LoggingMixin):  # pylint: disable=invalid-name
     def __enter__(self):
         try:
             signal.signal(signal.SIGALRM, self.handle_timeout)
-            signal.alarm(self.seconds)
+            signal.setitimer(signal.ITIMER_REAL, self.seconds)
         except ValueError as e:
             self.log.warning("timeout can't be used in the current context")
             self.log.exception(e)
 
     def __exit__(self, type_, value, traceback):
         try:
-            signal.alarm(0)
+            signal.setitimer(signal.ITIMER_REAL, 0)
         except ValueError as e:
             self.log.warning("timeout can't be used in the current context")
             self.log.exception(e)

Reply via email to