This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa4858d85a AIP-51 - Misc. Compatibility Checks (#28375)
aa4858d85a is described below

commit aa4858d85a00759a4a84bdd5fb3fe6cec196831e
Author: Robert Karish <[email protected]>
AuthorDate: Thu Jan 5 13:25:08 2023 -0500

    AIP-51 - Misc. Compatibility Checks (#28375)
    
    * Remove pickling executor coupling from backfill_job.py (apache#27930)
    
    * Remove pickling executor coupling from scheduler_job.py (apache#27930)
    
    * Remove executor coupling from sentry.py, add tests for supports_sentry 
Executor attribute (apache#27930)
    
    * Use default executor helper method and rework pickling support attribute 
(apache#27930)
---
 airflow/executors/base_executor.py                 | 2 ++
 airflow/executors/celery_executor.py               | 1 +
 airflow/executors/celery_kubernetes_executor.py    | 3 +++
 airflow/executors/dask_executor.py                 | 2 ++
 airflow/executors/executor_loader.py               | 2 ++
 airflow/executors/local_executor.py                | 1 +
 airflow/executors/local_kubernetes_executor.py     | 3 +++
 airflow/executors/sequential_executor.py           | 1 +
 airflow/jobs/backfill_job.py                       | 9 +++------
 airflow/jobs/scheduler_job.py                      | 6 ++++--
 airflow/sentry.py                                  | 6 ++++--
 tests/executors/test_base_executor.py              | 8 ++++++++
 tests/executors/test_celery_executor.py            | 7 +++++++
 tests/executors/test_celery_kubernetes_executor.py | 6 ++++++
 tests/executors/test_dask_executor.py              | 6 ++++++
 tests/executors/test_kubernetes_executor.py        | 6 ++++++
 tests/executors/test_local_executor.py             | 6 ++++++
 tests/executors/test_local_kubernetes_executor.py  | 6 ++++++
 tests/executors/test_sequential_executor.py        | 6 ++++++
 19 files changed, 77 insertions(+), 10 deletions(-)

diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index 3b50685315..a56b20ec84 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -106,6 +106,8 @@ class BaseExecutor(LoggingMixin):
     """
 
     supports_ad_hoc_ti_run: bool = False
+    supports_pickling: bool = True
+    supports_sentry: bool = False
 
     job_id: None | int | str = None
     callback_sink: BaseCallbackSink | None = None
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 125c2791f2..ec7e9f008d 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -228,6 +228,7 @@ class CeleryExecutor(BaseExecutor):
     """
 
     supports_ad_hoc_ti_run: bool = True
+    supports_sentry: bool = True
 
     def __init__(self):
         super().__init__()
diff --git a/airflow/executors/celery_kubernetes_executor.py 
b/airflow/executors/celery_kubernetes_executor.py
index a919757bba..474c6d4a8b 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -39,6 +39,9 @@ class CeleryKubernetesExecutor(LoggingMixin):
     """
 
     supports_ad_hoc_ti_run: bool = True
+    supports_pickling: bool = True
+    supports_sentry: bool = False
+
     callback_sink: BaseCallbackSink | None = None
 
     KUBERNETES_QUEUE = conf.get("celery_kubernetes_executor", 
"kubernetes_queue")
diff --git a/airflow/executors/dask_executor.py 
b/airflow/executors/dask_executor.py
index 3dbd071bef..0114933d0c 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -43,6 +43,8 @@ _UNDEFINED_QUEUES = {None, "default"}
 class DaskExecutor(BaseExecutor):
     """DaskExecutor submits tasks to a Dask Distributed cluster."""
 
+    supports_pickling: bool = False
+
     def __init__(self, cluster_address=None):
         super().__init__(parallelism=0)
         if cluster_address is None:
diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index 1c440dbfbf..6caad10c54 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -169,6 +169,8 @@ class ExecutorLoader:
         return local_kubernetes_executor_cls(local_executor, 
kubernetes_executor)
 
 
+# This tuple is deprecated due to AIP-51 and is no longer used in core Airflow.
+# TODO: Remove in Airflow 3.0
 UNPICKLEABLE_EXECUTORS = (
     LOCAL_EXECUTOR,
     SEQUENTIAL_EXECUTOR,
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 26e62a49e2..79f7547567 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -206,6 +206,7 @@ class LocalExecutor(BaseExecutor):
     """
 
     is_local: bool = True
+    supports_pickling: bool = False
 
     def __init__(self, parallelism: int = PARALLELISM):
         super().__init__(parallelism=parallelism)
diff --git a/airflow/executors/local_kubernetes_executor.py 
b/airflow/executors/local_kubernetes_executor.py
index 948f052c51..b582b37e8a 100644
--- a/airflow/executors/local_kubernetes_executor.py
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -39,6 +39,9 @@ class LocalKubernetesExecutor(LoggingMixin):
     """
 
     supports_ad_hoc_ti_run: bool = True
+    supports_pickling: bool = False
+    supports_sentry: bool = False
+
     callback_sink: BaseCallbackSink | None = None
 
     KUBERNETES_QUEUE = conf.get("local_kubernetes_executor", 
"kubernetes_queue")
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index e723113725..466c9ea4aa 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -44,6 +44,7 @@ class SequentialExecutor(BaseExecutor):
     SequentialExecutor alongside sqlite as you first install it.
     """
 
+    supports_pickling: bool = False
     is_local: bool = True
 
     def __init__(self):
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index af054e2f9f..ee37b9d510 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -35,7 +35,6 @@ from airflow.exceptions import (
     PoolNotFound,
     TaskConcurrencyLimitReached,
 )
-from airflow.executors import executor_constants
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.jobs.base_job import BaseJob
 from airflow.models import DAG, DagPickle
@@ -845,11 +844,9 @@ class BackfillJob(BaseJob):
         # picklin'
         pickle_id = None
 
-        if not self.donot_pickle and self.executor_class not in (
-            executor_constants.LOCAL_EXECUTOR,
-            executor_constants.SEQUENTIAL_EXECUTOR,
-            executor_constants.DASK_EXECUTOR,
-        ):
+        executor_class, _ = ExecutorLoader.import_default_executor_cls()
+
+        if not self.donot_pickle and executor_class.supports_pickling:
             pickle = DagPickle(self.dag)
             session.add(pickle)
             session.commit()
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 4c4bdae79b..5977a0cc9b 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -41,7 +41,7 @@ from airflow.callbacks.callback_requests import 
DagCallbackRequest, SlaCallbackR
 from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
 from airflow.configuration import conf
 from airflow.exceptions import RemovedInAirflow3Warning
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.jobs.base_job import BaseJob
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbag import DagBag
@@ -720,8 +720,10 @@ class SchedulerJob(BaseJob):
 
         self.log.info("Starting the scheduler")
 
+        executor_class, _ = ExecutorLoader.import_default_executor_cls()
+
         # DAGs can be pickled for easier remote execution by some executors
-        pickle_dags = self.do_pickle and self.executor_class not in 
UNPICKLEABLE_EXECUTORS
+        pickle_dags = self.do_pickle and executor_class.supports_pickling
 
         self.log.info("Processing each file at most %s times", 
self.num_times_parse_dags)
 
diff --git a/airflow/sentry.py b/airflow/sentry.py
index e76fe1db1b..1ae1b35c52 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -23,6 +23,7 @@ from functools import wraps
 from typing import TYPE_CHECKING
 
 from airflow.configuration import conf
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.session import find_session_idx, provide_session
 from airflow.utils.state import State
 
@@ -82,14 +83,15 @@ if conf.getboolean("sentry", "sentry_on", fallback=False):
         def __init__(self):
             """Initialize the Sentry SDK."""
             ignore_logger("airflow.task")
-            executor_name = conf.get("core", "EXECUTOR")
 
             sentry_flask = FlaskIntegration()
 
             # LoggingIntegration is set by default.
             integrations = [sentry_flask]
 
-            if executor_name == "CeleryExecutor":
+            executor_class, _ = ExecutorLoader.import_default_executor_cls()
+
+            if executor_class.supports_sentry:
                 from sentry_sdk.integrations.celery import CeleryIntegration
 
                 sentry_celery = CeleryIntegration()
diff --git a/tests/executors/test_base_executor.py 
b/tests/executors/test_base_executor.py
index c88bd333dc..af2894f891 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -32,6 +32,14 @@ from airflow.utils import timezone
 from airflow.utils.state import State
 
 
+def test_supports_sentry():
+    assert not BaseExecutor.supports_sentry
+
+
+def test_supports_pickling():
+    assert BaseExecutor.supports_pickling
+
+
 def test_is_local_default_value():
     assert not BaseExecutor.is_local
 
diff --git a/tests/executors/test_celery_executor.py 
b/tests/executors/test_celery_executor.py
index 3a9aab576d..3074cb32c2 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -35,6 +35,7 @@ from parameterized import parameterized
 
 from airflow.configuration import conf
 from airflow.executors import celery_executor
+from airflow.executors.celery_executor import CeleryExecutor
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
@@ -98,6 +99,12 @@ class TestCeleryExecutor:
         db.clear_db_runs()
         db.clear_db_jobs()
 
+    def test_supports_pickling(self):
+        assert CeleryExecutor.supports_pickling
+
+    def test_supports_sentry(self):
+        assert CeleryExecutor.supports_sentry
+
     @pytest.mark.quarantined
     @pytest.mark.backend("mysql", "postgres")
     def test_exception_propagation(self):
diff --git a/tests/executors/test_celery_kubernetes_executor.py 
b/tests/executors/test_celery_kubernetes_executor.py
index c6a19e0d7c..4233cf580d 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -31,6 +31,12 @@ KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE
 
 
 class TestCeleryKubernetesExecutor:
+    def test_supports_pickling(self):
+        assert CeleryKubernetesExecutor.supports_pickling
+
+    def test_supports_sentry(self):
+        assert not CeleryKubernetesExecutor.supports_sentry
+
     def test_is_local_default_value(self):
         assert not CeleryKubernetesExecutor.is_local
 
diff --git a/tests/executors/test_dask_executor.py 
b/tests/executors/test_dask_executor.py
index faeaef49a7..a5514d8db0 100644
--- a/tests/executors/test_dask_executor.py
+++ b/tests/executors/test_dask_executor.py
@@ -90,6 +90,12 @@ class TestDaskExecutor(TestBaseDask):
         self.dagbag = DagBag(include_examples=True)
         self.cluster = LocalCluster()
 
+    def test_supports_pickling(self):
+        assert not DaskExecutor.supports_pickling
+
+    def test_supports_sentry(self):
+        assert not DaskExecutor.supports_sentry
+
     def test_dask_executor_functions(self):
         executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
         self.assert_tasks_on_executor(executor, timeout_executor=120)
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 1385324d51..048caad8fd 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -1109,6 +1109,12 @@ class TestKubernetesExecutor:
         assert ti0.state == State.SCHEDULED
         assert ti1.state == State.QUEUED
 
+    def test_supports_pickling(self):
+        assert KubernetesExecutor.supports_pickling
+
+    def test_supports_sentry(self):
+        assert not KubernetesExecutor.supports_sentry
+
 
 class TestKubernetesJobWatcher:
     test_namespace = "airflow"
diff --git a/tests/executors/test_local_executor.py 
b/tests/executors/test_local_executor.py
index 7445224bf6..4de9250f10 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -31,6 +31,12 @@ class TestLocalExecutor:
 
     TEST_SUCCESS_COMMANDS = 5
 
+    def test_supports_pickling(self):
+        assert not LocalExecutor.supports_pickling
+
+    def test_supports_sentry(self):
+        assert not LocalExecutor.supports_sentry
+
     def test_is_local_default_value(self):
         assert LocalExecutor.is_local
 
diff --git a/tests/executors/test_local_kubernetes_executor.py 
b/tests/executors/test_local_kubernetes_executor.py
index 3dfcf09c86..0d3d86bad0 100644
--- a/tests/executors/test_local_kubernetes_executor.py
+++ b/tests/executors/test_local_kubernetes_executor.py
@@ -26,6 +26,12 @@ from airflow.executors.local_kubernetes_executor import 
LocalKubernetesExecutor
 
 
 class TestLocalKubernetesExecutor:
+    def test_supports_pickling(self):
+        assert not LocalKubernetesExecutor.supports_pickling
+
+    def test_supports_sentry(self):
+        assert not LocalKubernetesExecutor.supports_sentry
+
     def test_is_local_default_value(self):
         assert not LocalKubernetesExecutor.is_local
 
diff --git a/tests/executors/test_sequential_executor.py 
b/tests/executors/test_sequential_executor.py
index ecf7e31447..61cd3ef238 100644
--- a/tests/executors/test_sequential_executor.py
+++ b/tests/executors/test_sequential_executor.py
@@ -23,6 +23,12 @@ from airflow.executors.sequential_executor import 
SequentialExecutor
 
 
 class TestSequentialExecutor:
+    def test_supports_pickling(self):
+        assert not SequentialExecutor.supports_pickling
+
+    def test_supports_sentry(self):
+        assert not SequentialExecutor.supports_sentry
+
     def test_is_local_default_value(self):
         assert SequentialExecutor.is_local
 

Reply via email to