This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d20c32f6b7 Refactor: reduce some conditions in providers (#34440)
d20c32f6b7 is described below
commit d20c32f6b75b4d09c537d6902fae4c1be2e714be
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Tue Sep 19 20:17:49 2023 +0000
Refactor: reduce some conditions in providers (#34440)
---
.../providers/google/cloud/triggers/dataproc.py | 27 +++++++++-------------
airflow/providers/microsoft/azure/hooks/cosmos.py | 5 +---
.../providers/singularity/operators/singularity.py | 10 ++++----
3 files changed, 16 insertions(+), 26 deletions(-)
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py
b/airflow/providers/google/cloud/triggers/dataproc.py
index adb34cae03..9aec832d1a 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -98,11 +98,10 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
)
state = job.status.state
self.log.info("Dataproc job: %s is in state: %s", self.job_id,
state)
- if state in (JobStatus.State.ERROR, JobStatus.State.DONE,
JobStatus.State.CANCELLED):
- if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
- break
- elif state == JobStatus.State.ERROR:
- raise AirflowException(f"Dataproc job execution failed
{self.job_id}")
+ if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
+ break
+ elif state == JobStatus.State.ERROR:
+ raise AirflowException(f"Dataproc job execution failed
{self.job_id}")
await asyncio.sleep(self.polling_interval_seconds)
yield TriggerEvent({"job_id": self.job_id, "job_state": state})
@@ -316,21 +315,17 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
operation = await hook.get_operation(region=self.region,
operation_name=self.name)
if operation.done:
if operation.error.message:
- yield TriggerEvent(
- {
- "operation_name": operation.name,
- "operation_done": operation.done,
- "status": "error",
- "message": operation.error.message,
- }
- )
- return
+ status = "error"
+ message = operation.error.message
+ else:
+ status = "success"
+ message = "Operation is successfully ended."
yield TriggerEvent(
{
"operation_name": operation.name,
"operation_done": operation.done,
- "status": "success",
- "message": "Operation is successfully ended.",
+ "status": status,
+ "message": message,
}
)
return
diff --git a/airflow/providers/microsoft/azure/hooks/cosmos.py
b/airflow/providers/microsoft/azure/hooks/cosmos.py
index a23bd44035..1f9883899e 100644
--- a/airflow/providers/microsoft/azure/hooks/cosmos.py
+++ b/airflow/providers/microsoft/azure/hooks/cosmos.py
@@ -281,10 +281,7 @@ class AzureCosmosDBHook(BaseHook):
raise AirflowBadRequest("You cannot insert a None document")
# Add document id if isn't found
- if "id" in document:
- if document["id"] is None:
- document["id"] = document_id
- else:
+ if document.get("id") is None:
document["id"] = document_id
created_document = (
diff --git a/airflow/providers/singularity/operators/singularity.py
b/airflow/providers/singularity/operators/singularity.py
index dbc2796daa..f2c303ca5c 100644
--- a/airflow/providers/singularity/operators/singularity.py
+++ b/airflow/providers/singularity/operators/singularity.py
@@ -155,9 +155,8 @@ class SingularityOperator(BaseOperator):
self.log.info("Stopping instance %s", self.instance)
self.instance.stop() # type: ignore[attr-defined]
- if self.auto_remove is True:
- if self.auto_remove and os.path.exists(self.image):
- shutil.rmtree(self.image)
+ if self.auto_remove and os.path.exists(self.image):
+ shutil.rmtree(self.image)
# If the container failed, raise the exception
if result["return_code"] != 0:
@@ -179,6 +178,5 @@ class SingularityOperator(BaseOperator):
self.instance.stop()
# If an image exists, clean it up
- if self.auto_remove is True:
- if self.auto_remove and os.path.exists(self.image):
- shutil.rmtree(self.image)
+ if self.auto_remove and os.path.exists(self.image):
+ shutil.rmtree(self.image)