This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 60e6847c18 Use a single statement with multiple contexts instead of
nested statements in core (#33769)
60e6847c18 is described below
commit 60e6847c181959d95789266cc2712cdacaf18cf9
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Aug 26 08:54:54 2023 +0200
Use a single statement with multiple contexts instead of nested
statements in core (#33769)
---
airflow/sensors/bash.py | 75 +++++++++++++++++++++++++------------------------
airflow/utils/db.py | 7 ++---
2 files changed, 41 insertions(+), 41 deletions(-)
diff --git a/airflow/sensors/bash.py b/airflow/sensors/bash.py
index c99421c982..25c4a547b8 100644
--- a/airflow/sensors/bash.py
+++ b/airflow/sensors/bash.py
@@ -68,44 +68,45 @@ class BashSensor(BaseSensorOperator):
"""Execute the bash command in a temporary directory."""
bash_command = self.bash_command
self.log.info("Tmp dir root location: %s", gettempdir())
- with TemporaryDirectory(prefix="airflowtmp") as tmp_dir:
- with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
- f.write(bytes(bash_command, "utf_8"))
- f.flush()
- fname = f.name
- script_location = tmp_dir + "/" + fname
- self.log.info("Temporary script location: %s", script_location)
- self.log.info("Running command: %s", bash_command)
+ with TemporaryDirectory(prefix="airflowtmp") as tmp_dir,
NamedTemporaryFile(
+ dir=tmp_dir, prefix=self.task_id
+ ) as f:
+ f.write(bytes(bash_command, "utf_8"))
+ f.flush()
+ fname = f.name
+ script_location = tmp_dir + "/" + fname
+ self.log.info("Temporary script location: %s", script_location)
+ self.log.info("Running command: %s", bash_command)
- with Popen(
- ["bash", fname],
- stdout=PIPE,
- stderr=STDOUT,
- close_fds=True,
- cwd=tmp_dir,
- env=self.env,
- preexec_fn=os.setsid,
- ) as resp:
- if resp.stdout:
- self.log.info("Output:")
- for line in iter(resp.stdout.readline, b""):
-
self.log.info(line.decode(self.output_encoding).strip())
- resp.wait()
- self.log.info("Command exited with return code %s",
resp.returncode)
+ with Popen(
+ ["bash", fname],
+ stdout=PIPE,
+ stderr=STDOUT,
+ close_fds=True,
+ cwd=tmp_dir,
+ env=self.env,
+ preexec_fn=os.setsid,
+ ) as resp:
+ if resp.stdout:
+ self.log.info("Output:")
+ for line in iter(resp.stdout.readline, b""):
+
self.log.info(line.decode(self.output_encoding).strip())
+ resp.wait()
+ self.log.info("Command exited with return code %s",
resp.returncode)
- # zero code means success, the sensor can go green
- if resp.returncode == 0:
- return True
+ # zero code means success, the sensor can go green
+ if resp.returncode == 0:
+ return True
- # we have a retry exit code, sensor retries if return code
matches, otherwise error
- elif self.retry_exit_code is not None:
- if resp.returncode == self.retry_exit_code:
- self.log.info("Return code matches retry code,
will retry later")
- return False
- else:
- raise AirflowFailException(f"Command exited with
return code {resp.returncode}")
-
- # backwards compatibility: sensor retries no matter the
error code
- else:
- self.log.info("Non-zero return code and no retry code
set, will retry later")
+ # we have a retry exit code, sensor retries if return code
matches, otherwise error
+ elif self.retry_exit_code is not None:
+ if resp.returncode == self.retry_exit_code:
+ self.log.info("Return code matches retry code, will
retry later")
return False
+ else:
+ raise AirflowFailException(f"Command exited with
return code {resp.returncode}")
+
+ # backwards compatibility: sensor retries no matter the error
code
+ else:
+ self.log.info("Non-zero return code and no retry code set,
will retry later")
+ return False
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index a5252f4e70..db4631f148 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1650,10 +1650,9 @@ def resetdb(session: Session = NEW_SESSION, skip_init:
bool = False):
connection = settings.engine.connect()
- with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
- with connection.begin():
- drop_airflow_models(connection)
- drop_airflow_moved_tables(connection)
+ with create_global_lock(session=session, lock=DBLocks.MIGRATIONS),
connection.begin():
+ drop_airflow_models(connection)
+ drop_airflow_moved_tables(connection)
if not skip_init:
initdb(session=session)