This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4619e9e0ca [AIP-51] Fix calculate_env for non local executor (#28897)
4619e9e0ca is described below
commit 4619e9e0ca1222356dce9b2868d89153a705d60a
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Jan 13 00:29:22 2023 +0100
[AIP-51] Fix calculate_env for non local executor (#28897)
* Fix calculate_env for non local executor
* Add test
---
airflow/cli/commands/standalone_command.py | 2 +-
tests/cli/commands/test_standalone_command.py | 69 +++++++++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/airflow/cli/commands/standalone_command.py
b/airflow/cli/commands/standalone_command.py
index 9da2a215c9..660e5cc6df 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -158,7 +158,7 @@ class StandaloneCommand:
# Make sure we're using a local executor flavour
executor_class, _ = ExecutorLoader.import_default_executor_cls()
- if executor_class.is_local:
+ if not executor_class.is_local:
if "sqlite" in conf.get("database", "sql_alchemy_conn"):
self.print_output("standalone", "Forcing executor to
SequentialExecutor")
env["AIRFLOW__CORE__EXECUTOR"] =
executor_constants.SEQUENTIAL_EXECUTOR
diff --git a/tests/cli/commands/test_standalone_command.py
b/tests/cli/commands/test_standalone_command.py
new file mode 100644
index 0000000000..ec258c5041
--- /dev/null
+++ b/tests/cli/commands/test_standalone_command.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.cli.commands.standalone_command import StandaloneCommand
+from airflow.executors.executor_constants import (
+ CELERY_EXECUTOR,
+ CELERY_KUBERNETES_EXECUTOR,
+ DASK_EXECUTOR,
+ DEBUG_EXECUTOR,
+ KUBERNETES_EXECUTOR,
+ LOCAL_EXECUTOR,
+ LOCAL_KUBERNETES_EXECUTOR,
+ SEQUENTIAL_EXECUTOR,
+)
+
+
+class TestStandaloneCommand:
+ @pytest.mark.parametrize(
+ "conf_executor_name, conf_sql_alchemy_conn,
expected_standalone_executor",
+ [
+ (LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR),
+ (LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string",
SEQUENTIAL_EXECUTOR),
+ (SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+ (CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+ (CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string",
SEQUENTIAL_EXECUTOR),
+ (DASK_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+ (KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+ (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+ (LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+ (LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string",
LOCAL_EXECUTOR),
+ (SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
+ (CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+ (CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string",
LOCAL_EXECUTOR),
+ (DASK_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+ (KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+ (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+ ],
+ )
+ def test_calculate_env(self, conf_executor_name, conf_sql_alchemy_conn,
expected_standalone_executor):
+ """Should always force a local executor compatible with the db."""
+ with mock.patch.dict(
+ "os.environ",
+ {
+ "AIRFLOW__CORE__EXECUTOR": conf_executor_name,
+ "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": conf_sql_alchemy_conn,
+ },
+ ):
+ env = StandaloneCommand().calculate_env()
+ assert env["AIRFLOW__CORE__EXECUTOR"] ==
expected_standalone_executor