bharanidharan14 commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r934452024


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify 
job status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        job_id: str,
+        project_id: str,
+        region: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        delegate_to: Optional[str] = None,
+        pooling_period_seconds: int = 30,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.job_id = job_id
+        self.project_id = project_id
+        self.region = region
+        self.pooling_period_seconds = pooling_period_seconds
+        self.delegate_to = delegate_to
+        self.hook = DataprocAsyncHook(
+            delegate_to=self.delegate_to,
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def serialize(self):
+        return (
+            
"airflow.providers.google.cloud.operators.dataproc.DataprocBaseTrigger",
+            {
+                "job_id": self.job_id,
+                "project_id": self.project_id,
+                "region": self.region,
+                "gcp_conn_id": self.gcp_conn_id,
+                "delegate_to": self.delegate_to,
+                "impersonation_chain": self.impersonation_chain,
+                "pooling_period_seconds": self.pooling_period_seconds,
+            },
+        )
+
+    async def run(self):
+        while True:
+            job = await self.hook.get_job(project_id=self.project_id, 
region=self.region, job_id=self.job_id)
+            state = job.status.state
+            self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+            if state in (JobStatus.State.ERROR, JobStatus.State.DONE, 
JobStatus.State.CANCELLED):
+                if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
+                    break
+                elif state == JobStatus.State.ERROR:
+                    raise AirflowException(f"Dataproc job execution failed 
{self.job_id}")
+                else:

Review Comment:
   I guess you don't need this `else` block because in line 59 you are checking 
the state only for 'Error', 'Done', 'Cancelled' you won't be getting chance for 
other job status state



##########
docs/apache-airflow-providers-google/operators/cloud/dataproc.rst:
##########
@@ -174,6 +174,14 @@ Example of the configuration for a Spark Job:
     :start-after: [START how_to_cloud_dataproc_spark_config]
     :end-before: [END how_to_cloud_dataproc_spark_config]
 
+Example of the configuration for a Spark Job running in `deferrable mode 
<https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html>`__:
+
+.. exampleinclude:: 
/../../tests/system/providers/google/dataproc/example_dataproc_spark_deferrable.py
+    :language: python
+    :dedent: 0

Review Comment:
   why `:dedent: 0` ? it should be `:dedent: 4` ?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify 
job status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        job_id: str,
+        project_id: str,
+        region: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        delegate_to: Optional[str] = None,
+        pooling_period_seconds: int = 30,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.job_id = job_id
+        self.project_id = project_id
+        self.region = region
+        self.pooling_period_seconds = pooling_period_seconds
+        self.delegate_to = delegate_to
+        self.hook = DataprocAsyncHook(
+            delegate_to=self.delegate_to,
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def serialize(self):
+        return (
+            
"airflow.providers.google.cloud.operators.dataproc.DataprocBaseTrigger",

Review Comment:
   This class path is wrong it should be 
   `airflow.providers.google.cloud.triggers.dataproc.DataprocBaseTrigger`. 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify 
job status.

Review Comment:
   Spell check `pollls`. Also in the params it is mentioned as 
`pooling_period_seconds`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to