mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508048041



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example 
europe-west1)

Review comment:
       This is an essential feature of Airflow. In Airflow, you can define 
default arguments that will be all operators, but the parameter name must be 
consistent across all operators.
   ```python
   default_args = {
       'dataflow_default_options': {
           'tempLocation': GCS_TMP,
           'stagingLocation': GCS_STAGING,
       },
       'location': 'europe-west3'
   }
   
   with models.DAG(
       "example_gcp_dataflow_native_java",
       schedule_interval=None,  # Override to match your needs
       start_date=days_ago(1),
       tags=['example'],
   ) as dag_native_java:
       start_java_job = DataflowCreateJavaJobOperator(
           task_id="start-java-job",
           jar=GCS_JAR,
           job_name='{{task.task_id}}',
           options={
               'output': GCS_OUTPUT,
           },
           poll_sleep=10,
           job_class='org.apache.beam.examples.WordCount',
           check_if_running=CheckJobRunning.IgnoreJob,
           location='europe-west3',
       )
   
       # [START howto_operator_bigquery_create_table]
       create_table = BigQueryCreateEmptyTableOperator(
           task_id="create_table",
           dataset_id=DATASET_NAME,
           table_id="test_table",
           schema_fields=[
               {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
               {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
           ],
       )
       # [END howto_operator_bigquery_create_table]
   ```
   In the above example, task `create_table` and `start-java-job` is executed 
in one location - `europe-west3`. 
   
   Dataflow also uses the word "location" in its API to denote this field.
   ![Screenshot 2020-10-19 at 22 36 
49](https://user-images.githubusercontent.com/12058428/96508938-9ec58200-125b-11eb-9547-75b27aec7e93.png)
   
   
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to