This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa4858d85a AIP-51 - Misc. Compatibility Checks (#28375)
aa4858d85a is described below
commit aa4858d85a00759a4a84bdd5fb3fe6cec196831e
Author: Robert Karish <[email protected]>
AuthorDate: Thu Jan 5 13:25:08 2023 -0500
AIP-51 - Misc. Compatibility Checks (#28375)
* Remove pickling executor coupling from backfill_job.py (apache#27930)
* Remove pickling executor coupling from scheduler_job.py (apache#27930)
* Remove executor coupling from sentry.py, add tests for supports_sentry
Executor attribute (apache#27930)
* Use default executor helper method and rework pickling support attribute
(apache#27930)
---
airflow/executors/base_executor.py | 2 ++
airflow/executors/celery_executor.py | 1 +
airflow/executors/celery_kubernetes_executor.py | 3 +++
airflow/executors/dask_executor.py | 2 ++
airflow/executors/executor_loader.py | 2 ++
airflow/executors/local_executor.py | 1 +
airflow/executors/local_kubernetes_executor.py | 3 +++
airflow/executors/sequential_executor.py | 1 +
airflow/jobs/backfill_job.py | 9 +++------
airflow/jobs/scheduler_job.py | 6 ++++--
airflow/sentry.py | 6 ++++--
tests/executors/test_base_executor.py | 8 ++++++++
tests/executors/test_celery_executor.py | 7 +++++++
tests/executors/test_celery_kubernetes_executor.py | 6 ++++++
tests/executors/test_dask_executor.py | 6 ++++++
tests/executors/test_kubernetes_executor.py | 6 ++++++
tests/executors/test_local_executor.py | 6 ++++++
tests/executors/test_local_kubernetes_executor.py | 6 ++++++
tests/executors/test_sequential_executor.py | 6 ++++++
19 files changed, 77 insertions(+), 10 deletions(-)
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 3b50685315..a56b20ec84 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -106,6 +106,8 @@ class BaseExecutor(LoggingMixin):
"""
supports_ad_hoc_ti_run: bool = False
+ supports_pickling: bool = True
+ supports_sentry: bool = False
job_id: None | int | str = None
callback_sink: BaseCallbackSink | None = None
diff --git a/airflow/executors/celery_executor.py
b/airflow/executors/celery_executor.py
index 125c2791f2..ec7e9f008d 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -228,6 +228,7 @@ class CeleryExecutor(BaseExecutor):
"""
supports_ad_hoc_ti_run: bool = True
+ supports_sentry: bool = True
def __init__(self):
super().__init__()
diff --git a/airflow/executors/celery_kubernetes_executor.py
b/airflow/executors/celery_kubernetes_executor.py
index a919757bba..474c6d4a8b 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -39,6 +39,9 @@ class CeleryKubernetesExecutor(LoggingMixin):
"""
supports_ad_hoc_ti_run: bool = True
+ supports_pickling: bool = True
+ supports_sentry: bool = False
+
callback_sink: BaseCallbackSink | None = None
KUBERNETES_QUEUE = conf.get("celery_kubernetes_executor",
"kubernetes_queue")
diff --git a/airflow/executors/dask_executor.py
b/airflow/executors/dask_executor.py
index 3dbd071bef..0114933d0c 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -43,6 +43,8 @@ _UNDEFINED_QUEUES = {None, "default"}
class DaskExecutor(BaseExecutor):
"""DaskExecutor submits tasks to a Dask Distributed cluster."""
+ supports_pickling: bool = False
+
def __init__(self, cluster_address=None):
super().__init__(parallelism=0)
if cluster_address is None:
diff --git a/airflow/executors/executor_loader.py
b/airflow/executors/executor_loader.py
index 1c440dbfbf..6caad10c54 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -169,6 +169,8 @@ class ExecutorLoader:
return local_kubernetes_executor_cls(local_executor,
kubernetes_executor)
+# This tuple is deprecated due to AIP-51 and is no longer used in core Airflow.
+# TODO: Remove in Airflow 3.0
UNPICKLEABLE_EXECUTORS = (
LOCAL_EXECUTOR,
SEQUENTIAL_EXECUTOR,
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index 26e62a49e2..79f7547567 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -206,6 +206,7 @@ class LocalExecutor(BaseExecutor):
"""
is_local: bool = True
+ supports_pickling: bool = False
def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=parallelism)
diff --git a/airflow/executors/local_kubernetes_executor.py
b/airflow/executors/local_kubernetes_executor.py
index 948f052c51..b582b37e8a 100644
--- a/airflow/executors/local_kubernetes_executor.py
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -39,6 +39,9 @@ class LocalKubernetesExecutor(LoggingMixin):
"""
supports_ad_hoc_ti_run: bool = True
+ supports_pickling: bool = False
+ supports_sentry: bool = False
+
callback_sink: BaseCallbackSink | None = None
KUBERNETES_QUEUE = conf.get("local_kubernetes_executor",
"kubernetes_queue")
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index e723113725..466c9ea4aa 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -44,6 +44,7 @@ class SequentialExecutor(BaseExecutor):
SequentialExecutor alongside sqlite as you first install it.
"""
+ supports_pickling: bool = False
is_local: bool = True
def __init__(self):
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index af054e2f9f..ee37b9d510 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -35,7 +35,6 @@ from airflow.exceptions import (
PoolNotFound,
TaskConcurrencyLimitReached,
)
-from airflow.executors import executor_constants
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagPickle
@@ -845,11 +844,9 @@ class BackfillJob(BaseJob):
# picklin'
pickle_id = None
- if not self.donot_pickle and self.executor_class not in (
- executor_constants.LOCAL_EXECUTOR,
- executor_constants.SEQUENTIAL_EXECUTOR,
- executor_constants.DASK_EXECUTOR,
- ):
+ executor_class, _ = ExecutorLoader.import_default_executor_cls()
+
+ if not self.donot_pickle and executor_class.supports_pickling:
pickle = DagPickle(self.dag)
session.add(pickle)
session.commit()
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 4c4bdae79b..5977a0cc9b 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -41,7 +41,7 @@ from airflow.callbacks.callback_requests import
DagCallbackRequest, SlaCallbackR
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job import BaseJob
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
@@ -720,8 +720,10 @@ class SchedulerJob(BaseJob):
self.log.info("Starting the scheduler")
+ executor_class, _ = ExecutorLoader.import_default_executor_cls()
+
# DAGs can be pickled for easier remote execution by some executors
- pickle_dags = self.do_pickle and self.executor_class not in
UNPICKLEABLE_EXECUTORS
+ pickle_dags = self.do_pickle and executor_class.supports_pickling
self.log.info("Processing each file at most %s times",
self.num_times_parse_dags)
diff --git a/airflow/sentry.py b/airflow/sentry.py
index e76fe1db1b..1ae1b35c52 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -23,6 +23,7 @@ from functools import wraps
from typing import TYPE_CHECKING
from airflow.configuration import conf
+from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.session import find_session_idx, provide_session
from airflow.utils.state import State
@@ -82,14 +83,15 @@ if conf.getboolean("sentry", "sentry_on", fallback=False):
def __init__(self):
"""Initialize the Sentry SDK."""
ignore_logger("airflow.task")
- executor_name = conf.get("core", "EXECUTOR")
sentry_flask = FlaskIntegration()
# LoggingIntegration is set by default.
integrations = [sentry_flask]
- if executor_name == "CeleryExecutor":
+ executor_class, _ = ExecutorLoader.import_default_executor_cls()
+
+ if executor_class.supports_sentry:
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_celery = CeleryIntegration()
diff --git a/tests/executors/test_base_executor.py
b/tests/executors/test_base_executor.py
index c88bd333dc..af2894f891 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -32,6 +32,14 @@ from airflow.utils import timezone
from airflow.utils.state import State
+def test_supports_sentry():
+ assert not BaseExecutor.supports_sentry
+
+
+def test_supports_pickling():
+ assert BaseExecutor.supports_pickling
+
+
def test_is_local_default_value():
assert not BaseExecutor.is_local
diff --git a/tests/executors/test_celery_executor.py
b/tests/executors/test_celery_executor.py
index 3a9aab576d..3074cb32c2 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -35,6 +35,7 @@ from parameterized import parameterized
from airflow.configuration import conf
from airflow.executors import celery_executor
+from airflow.executors.celery_executor import CeleryExecutor
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
@@ -98,6 +99,12 @@ class TestCeleryExecutor:
db.clear_db_runs()
db.clear_db_jobs()
+ def test_supports_pickling(self):
+ assert CeleryExecutor.supports_pickling
+
+ def test_supports_sentry(self):
+ assert CeleryExecutor.supports_sentry
+
@pytest.mark.quarantined
@pytest.mark.backend("mysql", "postgres")
def test_exception_propagation(self):
diff --git a/tests/executors/test_celery_kubernetes_executor.py
b/tests/executors/test_celery_kubernetes_executor.py
index c6a19e0d7c..4233cf580d 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -31,6 +31,12 @@ KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE
class TestCeleryKubernetesExecutor:
+ def test_supports_pickling(self):
+ assert CeleryKubernetesExecutor.supports_pickling
+
+ def test_supports_sentry(self):
+ assert not CeleryKubernetesExecutor.supports_sentry
+
def test_is_local_default_value(self):
assert not CeleryKubernetesExecutor.is_local
diff --git a/tests/executors/test_dask_executor.py
b/tests/executors/test_dask_executor.py
index faeaef49a7..a5514d8db0 100644
--- a/tests/executors/test_dask_executor.py
+++ b/tests/executors/test_dask_executor.py
@@ -90,6 +90,12 @@ class TestDaskExecutor(TestBaseDask):
self.dagbag = DagBag(include_examples=True)
self.cluster = LocalCluster()
+ def test_supports_pickling(self):
+ assert not DaskExecutor.supports_pickling
+
+ def test_supports_sentry(self):
+ assert not DaskExecutor.supports_sentry
+
def test_dask_executor_functions(self):
executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
self.assert_tasks_on_executor(executor, timeout_executor=120)
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index 1385324d51..048caad8fd 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -1109,6 +1109,12 @@ class TestKubernetesExecutor:
assert ti0.state == State.SCHEDULED
assert ti1.state == State.QUEUED
+ def test_supports_pickling(self):
+ assert KubernetesExecutor.supports_pickling
+
+ def test_supports_sentry(self):
+ assert not KubernetesExecutor.supports_sentry
+
class TestKubernetesJobWatcher:
test_namespace = "airflow"
diff --git a/tests/executors/test_local_executor.py
b/tests/executors/test_local_executor.py
index 7445224bf6..4de9250f10 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -31,6 +31,12 @@ class TestLocalExecutor:
TEST_SUCCESS_COMMANDS = 5
+ def test_supports_pickling(self):
+ assert not LocalExecutor.supports_pickling
+
+ def test_supports_sentry(self):
+ assert not LocalExecutor.supports_sentry
+
def test_is_local_default_value(self):
assert LocalExecutor.is_local
diff --git a/tests/executors/test_local_kubernetes_executor.py
b/tests/executors/test_local_kubernetes_executor.py
index 3dfcf09c86..0d3d86bad0 100644
--- a/tests/executors/test_local_kubernetes_executor.py
+++ b/tests/executors/test_local_kubernetes_executor.py
@@ -26,6 +26,12 @@ from airflow.executors.local_kubernetes_executor import
LocalKubernetesExecutor
class TestLocalKubernetesExecutor:
+ def test_supports_pickling(self):
+ assert not LocalKubernetesExecutor.supports_pickling
+
+ def test_supports_sentry(self):
+ assert not LocalKubernetesExecutor.supports_sentry
+
def test_is_local_default_value(self):
assert not LocalKubernetesExecutor.is_local
diff --git a/tests/executors/test_sequential_executor.py
b/tests/executors/test_sequential_executor.py
index ecf7e31447..61cd3ef238 100644
--- a/tests/executors/test_sequential_executor.py
+++ b/tests/executors/test_sequential_executor.py
@@ -23,6 +23,12 @@ from airflow.executors.sequential_executor import
SequentialExecutor
class TestSequentialExecutor:
+ def test_supports_pickling(self):
+ assert not SequentialExecutor.supports_pickling
+
+ def test_supports_sentry(self):
+ assert not SequentialExecutor.supports_sentry
+
def test_is_local_default_value(self):
assert SequentialExecutor.is_local