akshayapte edited a comment on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-950399576
Hello @TobKed I have the same issue when I use the following code:
`
with models.DAG(
# The id you will see in the DAG airflow page
"event_consolidator",
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your
needs
) as dag:
start_template_job = DataflowStartFlexTemplateOperator(
wait_until_finished=True,
do_xcom_push=True,
# The task id of your job
task_id="dataflow_operator_event_consolidation",
location="europe-west4",
body={
"launchParameter": {
"containerSpecGcsPath": "gs://***/python_command_spec.json",
"jobName": "discover-event-consolidation-from-airflow",
"parameters": {
"matched_event": '*********',
"consolidated_event": 'gs://****/',
"setup_file": '/dataflow/template/setup.py'
},
"environment": {
"subnetwork":
'https://www.googleapis.com/compute/v1/projects/***/regions/europe-west4/subnetworks/**********,
"machineType": "n1-standard-1",
"numWorkers": "1",
"maxWorkers": "1",
"tempLocation": "*******" + "/tmp/",
},
}
},
project_id="trv-hs-src-disco-consol-edge"
)
File
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line
321, in _check_dataflow_job_stat
wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
KeyError: 'type'
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]