o-nikolas commented on code in PR #28934:
URL: https://github.com/apache/airflow/pull/28934#discussion_r1071544358


##########
airflow/sensors/base.py:
##########
@@ -257,11 +258,14 @@ def _get_next_poke_interval(
 
     def prepare_for_execution(self) -> BaseOperator:
         task = super().prepare_for_execution()
+
         # Sensors in `poke` mode can block execution of DAGs when running
         # with single process executor, thus we change the mode to`reschedule`
         # to allow parallel task being scheduled and executed
-        if conf.get("core", "executor") == "DebugExecutor":
-            self.log.warning("DebugExecutor changes sensor mode to 
'reschedule'.")
+        executor_name = conf.get("core", "executor")
+        executor = ExecutorLoader.load_executor(executor_name)

Review Comment:
   You can just use `import_default_executor_cls` here I reckon



##########
airflow/cli/commands/standalone_command.py:
##########
@@ -159,14 +159,28 @@ def calculate_env(self):
         # Make sure we're using a local executor flavour
         executor_class, _ = ExecutorLoader.import_default_executor_cls()
         if not executor_class.is_local:
-            if "sqlite" in conf.get("database", "sql_alchemy_conn"):
-                self.print_output("standalone", "Forcing executor to 
SequentialExecutor")
-                env["AIRFLOW__CORE__EXECUTOR"] = 
executor_constants.SEQUENTIAL_EXECUTOR
-            else:
-                self.print_output("standalone", "Forcing executor to 
LocalExecutor")
-                env["AIRFLOW__CORE__EXECUTOR"] = 
executor_constants.LOCAL_EXECUTOR
+            env["AIRFLOW__CORE__EXECUTOR"] = self.get_local_executor(
+                database=conf.get("database", "sql_alchemy_conn")
+            )
+            self.print_output("standalone", f"Forcing executor to 
{env['AIRFLOW__CORE__EXECUTOR']}")
         return env
 
+    @classmethod
+    def get_local_executor(cls, database: str) -> str:
+        """
+        Get local executor for standalone command
+        for sqlite we need SEQUENTIAL_EXECUTOR otherwise LOCAL_EXECUTOR.
+        """
+        try:
+            return [
+                executor_name
+                for executor_name in ExecutorLoader.executors.keys()
+                if executor_name != "DaskExecutor"

Review Comment:
   This looks like executor coupling, why are we excluding the Dask Executor 
here specifically? Could there ever be other executors that would need 
excluding in the same way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to