myang-clgx opened a new issue, #24265:
URL: https://github.com/apache/airflow/issues/24265

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-beam 3.4.0
   apache-airflow-providers-ftp         2.1.2
   apache-airflow-providers-google      7.0.0
   apache-airflow-providers-http        2.1.2
   apache-airflow-providers-imap        2.2.3
   apache-airflow-providers-sqlite      2.1.3
   
   ### Apache Airflow version
   
   2.3.1
   
   ### Operating System
   
   MAC OSX 12.4
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Just Local Deployment without Docker
   
   ### What happened
   
   Try to run a Dag with Dataflow Flex template operator like below,
   
   @task
   def get_main_config():
      ...
      return config
   
   @task
   def get_dataflow_node_config(main_json_config: dict):
      ...
      return config
   
   def custom_dataflow_etl(**kwargs):
   
       fetch_main_config = get_main_config()
       fetch_dataflow_config_dic = get_dataflow_node_config(fetch_main_config)
       process_dataflow = DataflowStartFlexTemplateOperator(
               task_id="start_dataflow_flex_template",
               body=fetch_dataflow_config_dic, do_xcom_push=True, 
location=DATAFLOW_LOCATION)
       fetch_dataflow_config >> process_dataflow
   
   
   custom_dataflow = custom_dataflow_etl()
   
   It gives Exception when do DAG RUN.
   [2022-06-03, 13:34:29 UTC] {taskinstance.py:1890} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/cloud/operators/dataflow.py",
 line 773, in execute
       job = self.hook.start_flex_template(
     File 
"/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/common/hooks/base_google.py",
 line 439, in inner_wrapper
       return func(self, *args, **kwargs)
     File 
"/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 754, in start_flex_template
       job = response["job"]
   KeyError: 'job'
   
   
   
   
   ### What you think should happen instead
   
   It will go through because I have tried to use Google Cloud CLI to test out 
the DataflowFlexTemplate with same test values on parameters and it executes.
   
   ### How to reproduce
   
   Create a DAG and use Dataflextemplateoperator like below,
   
   @task
   def get_main_config():
      ...
      return config
   
   @task
   def get_dataflow_node_config(main_json_config: dict):
      ...
      return config
   
   def custom_dataflow_etl(**kwargs):
   
       fetch_main_config = get_main_config()
       fetch_dataflow_config_dic = get_dataflow_node_config(fetch_main_config)
       process_dataflow = DataflowStartFlexTemplateOperator(
               task_id="start_dataflow_flex_template",
               body=fetch_dataflow_config_dic, do_xcom_push=True, 
location=DATAFLOW_LOCATION)
       fetch_dataflow_config >> process_dataflow
   
   
   custom_dataflow = custom_dataflow_etl()
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to