myang-clgx opened a new issue, #24265:
URL: https://github.com/apache/airflow/issues/24265
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
apache-airflow-providers-apache-beam 3.4.0
apache-airflow-providers-ftp 2.1.2
apache-airflow-providers-google 7.0.0
apache-airflow-providers-http 2.1.2
apache-airflow-providers-imap 2.2.3
apache-airflow-providers-sqlite 2.1.3
### Apache Airflow version
2.3.1
### Operating System
MAC OSX 12.4
### Deployment
Other
### Deployment details
Just Local Deployment without Docker
### What happened
Try to run a Dag with Dataflow Flex template operator like below,
@task
def get_main_config():
...
return config
@task
def get_dataflow_node_config(main_json_config: dict):
...
return config
def custom_dataflow_etl(**kwargs):
fetch_main_config = get_main_config()
fetch_dataflow_config_dic = get_dataflow_node_config(fetch_main_config)
process_dataflow = DataflowStartFlexTemplateOperator(
task_id="start_dataflow_flex_template",
body=fetch_dataflow_config_dic, do_xcom_push=True,
location=DATAFLOW_LOCATION)
fetch_dataflow_config >> process_dataflow
custom_dataflow = custom_dataflow_etl()
It gives Exception when do DAG RUN.
[2022-06-03, 13:34:29 UTC] {taskinstance.py:1890} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/cloud/operators/dataflow.py",
line 773, in execute
job = self.hook.start_flex_template(
File
"/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/common/hooks/base_google.py",
line 439, in inner_wrapper
return func(self, *args, **kwargs)
File
"/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 754, in start_flex_template
job = response["job"]
KeyError: 'job'
### What you think should happen instead
It will go through because I have tried to use Google Cloud CLI to test out
the DataflowFlexTemplate with same test values on parameters and it executes.
### How to reproduce
Create a DAG and use Dataflextemplateoperator like below,
@task
def get_main_config():
...
return config
@task
def get_dataflow_node_config(main_json_config: dict):
...
return config
def custom_dataflow_etl(**kwargs):
fetch_main_config = get_main_config()
fetch_dataflow_config_dic = get_dataflow_node_config(fetch_main_config)
process_dataflow = DataflowStartFlexTemplateOperator(
task_id="start_dataflow_flex_template",
body=fetch_dataflow_config_dic, do_xcom_push=True,
location=DATAFLOW_LOCATION)
fetch_dataflow_config >> process_dataflow
custom_dataflow = custom_dataflow_etl()
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]