jaketf commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r419001554



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -583,6 +623,49 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_flex_template(
+        self,
+        body: Dict,
+        location: str,
+        project_id: str,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts flex templates with the Dataflow  pipeline.
+
+        :param body: The request body

Review comment:
       nit: use more descriptive name `launch_flex_template_parameters`
   
   If we are not going to construct this body here and document the parameters 
then this should include link tot he API spec for this model.
   
   Unfortunately it looks like we mention this end point [here in flex 
templates 
docs](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates)
 but not here in the [REST API 
docs](https://cloud.google.com/dataflow/docs/reference/rest).
   
   I've opened an internal docs bug on this.
   
   ~In the mean time I'm pretty sure this body is just the same models as the 
"old" templates endpoint uses 
[LaunchTemplateParameters](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters).
   @aaltay can you confirm?~ 
   
   Looks like LaunchTemplateParameters is missing the `container_spec_gcs_path` 
key which is necessary for this endpoint.
   Why is this key snake_case in the API while all others (e.g. jobName) are 
camelCase anyway? 

##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -984,15 +1034,17 @@ def test_dataflow_job_is_job_running_with_no_job(self):
         self.assertEqual(False, result)
 
     def test_dataflow_job_cancel_job(self):
-        job = {"id": TEST_JOB_ID, "name": TEST_JOB_NAME, "currentState": 
DataflowJobStatus.JOB_STATE_RUNNING}
-
         get_method = (
             self.mock_dataflow.projects.return_value.
             locations.return_value.
             jobs.return_value.
             get
         )
-        get_method.return_value.execute.return_value = job
+        get_method.return_value.execute.side_effect = [
+            {"id": TEST_JOB_ID, "name": TEST_JOB_NAME, "currentState": 
DataflowJobStatus.JOB_STATE_RUNNING},

Review comment:
       note from other PR #8553 see how `JobStatus.JOB_STATE` is redundant 
stutter?
   Especially if we allow user to control failed states we should remove the 
stutter.

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+    """
+    Starts flex templates with the Dataflow  pipeline.
+
+    :param body: The request body

Review comment:
       see comment on same param in hook.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to