thesuperzapper opened a new pull request, #31829: URL: https://github.com/apache/airflow/pull/31829
<!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of an existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> closes https://github.com/apache/airflow/issues/16163 ### __WARNING:__ we need to test/discuss the implications of this change before merging! ## What is this PR changing? This PR very simply changes the [celery config named `task_acks_late`](https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-acks-late) from `True` to `False`. The basic meaning of this config is: - `task_acks_late=True`: celery workers [acknowledge tasks LATE](https://docs.celeryq.dev/en/stable/glossary.html#term-late-acknowledgment), AFTER they finish running - `task_acks_late=False`: celery workers [acknowledge tasks EARLY](https://docs.celeryq.dev/en/stable/glossary.html#term-early-acknowledgment), just BEFORE they start running ## What's the problem? What's the problem with the behavior of `task_acks_late=True`? Effectively it makes Celery re-assign long-running task instances, specifically, ones that take more than [`visibility_timeout`](https://docs.celeryq.dev/en/4.4.2/getting-started/brokers/redis.html#visibility-timeout) seconds (even if they are still happily running on another worker). This results in some strange behavior as reported in https://github.com/apache/airflow/issues/16163 When this happens, we get two of a single task instance running concurrently. The new instance will realize the old task is still running, and will correctly wait. But now, the Airflow UI will only show the logs for this new "phantom task", which will always be: ``` {taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state {taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run. ``` ## Why will this fix it? First, it's important to understand that celery tasks are NOT airflow tasks: - `"celery task"`: are like a "transport layer" from the airflow scheduler's perspective, in that the scheduler creates a celery task for each "try" of an airflow task instance (when configured to use the [`CeleryExecutor`](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/celery.html)) - `"airflow task instance"`: are the [concept we all know](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances) The airflow scheduler is already designed to handle ["zombie" task instances](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks) by killing task instances that don't "heartbeat" at least once in a period ([`300` seconds, by default](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#scheduler-zombie-task-threshold)), so we don't want celery taking matters into its own hands. ## Other Thoughts With this change, `visibility_timeout` becomes effectively irrelevant (as long as it's larger than the `scheduler_zombie_task_threshold`), so we might want to remove the [`AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#scheduler-zombie-task-threshold) config, and set it to some very high value. - Alternatively, if we set `visibility_timeout` to a number lower than `scheduler_zombie_task_threshold`, there might be cases where a task instance "retry" is avoided (like when a worker crashes), but it would also introduce the risk of two tasks instances running concurrently again --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
