This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d20c32f6b7 Refactor: reduce some conditions in providers (#34440)
d20c32f6b7 is described below

commit d20c32f6b75b4d09c537d6902fae4c1be2e714be
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Tue Sep 19 20:17:49 2023 +0000

    Refactor: reduce some conditions in providers (#34440)
---
 .../providers/google/cloud/triggers/dataproc.py    | 27 +++++++++-------------
 airflow/providers/microsoft/azure/hooks/cosmos.py  |  5 +---
 .../providers/singularity/operators/singularity.py | 10 ++++----
 3 files changed, 16 insertions(+), 26 deletions(-)

diff --git a/airflow/providers/google/cloud/triggers/dataproc.py 
b/airflow/providers/google/cloud/triggers/dataproc.py
index adb34cae03..9aec832d1a 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -98,11 +98,10 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
             )
             state = job.status.state
             self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-            if state in (JobStatus.State.ERROR, JobStatus.State.DONE, 
JobStatus.State.CANCELLED):
-                if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
-                    break
-                elif state == JobStatus.State.ERROR:
-                    raise AirflowException(f"Dataproc job execution failed 
{self.job_id}")
+            if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
+                break
+            elif state == JobStatus.State.ERROR:
+                raise AirflowException(f"Dataproc job execution failed 
{self.job_id}")
             await asyncio.sleep(self.polling_interval_seconds)
         yield TriggerEvent({"job_id": self.job_id, "job_state": state})
 
@@ -316,21 +315,17 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
                 operation = await hook.get_operation(region=self.region, 
operation_name=self.name)
                 if operation.done:
                     if operation.error.message:
-                        yield TriggerEvent(
-                            {
-                                "operation_name": operation.name,
-                                "operation_done": operation.done,
-                                "status": "error",
-                                "message": operation.error.message,
-                            }
-                        )
-                        return
+                        status = "error"
+                        message = operation.error.message
+                    else:
+                        status = "success"
+                        message = "Operation is successfully ended."
                     yield TriggerEvent(
                         {
                             "operation_name": operation.name,
                             "operation_done": operation.done,
-                            "status": "success",
-                            "message": "Operation is successfully ended.",
+                            "status": status,
+                            "message": message,
                         }
                     )
                     return
diff --git a/airflow/providers/microsoft/azure/hooks/cosmos.py 
b/airflow/providers/microsoft/azure/hooks/cosmos.py
index a23bd44035..1f9883899e 100644
--- a/airflow/providers/microsoft/azure/hooks/cosmos.py
+++ b/airflow/providers/microsoft/azure/hooks/cosmos.py
@@ -281,10 +281,7 @@ class AzureCosmosDBHook(BaseHook):
             raise AirflowBadRequest("You cannot insert a None document")
 
         # Add document id if isn't found
-        if "id" in document:
-            if document["id"] is None:
-                document["id"] = document_id
-        else:
+        if document.get("id") is None:
             document["id"] = document_id
 
         created_document = (
diff --git a/airflow/providers/singularity/operators/singularity.py 
b/airflow/providers/singularity/operators/singularity.py
index dbc2796daa..f2c303ca5c 100644
--- a/airflow/providers/singularity/operators/singularity.py
+++ b/airflow/providers/singularity/operators/singularity.py
@@ -155,9 +155,8 @@ class SingularityOperator(BaseOperator):
         self.log.info("Stopping instance %s", self.instance)
         self.instance.stop()  # type: ignore[attr-defined]
 
-        if self.auto_remove is True:
-            if self.auto_remove and os.path.exists(self.image):
-                shutil.rmtree(self.image)
+        if self.auto_remove and os.path.exists(self.image):
+            shutil.rmtree(self.image)
 
         # If the container failed, raise the exception
         if result["return_code"] != 0:
@@ -179,6 +178,5 @@ class SingularityOperator(BaseOperator):
             self.instance.stop()
 
             # If an image exists, clean it up
-            if self.auto_remove is True:
-                if self.auto_remove and os.path.exists(self.image):
-                    shutil.rmtree(self.image)
+            if self.auto_remove and os.path.exists(self.image):
+                shutil.rmtree(self.image)

Reply via email to