terekete opened a new issue #13262:
URL: https://github.com/apache/airflow/issues/13262
**Apache Airflow version**:
1. 1.10.9 Composer Airflow Image
**Environment**:
- **Cloud provider or hardware configuration**: Cloud Composer
**What happened**:
Error logs indicate appears to not recognize the job as Batch.
[2020-12-22 16:28:53,445] {taskinstance.py:1135} ERROR - 'type'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in
_run_raw_tas
result = task_copy.execute(context=context
File
"/usr/local/lib/airflow/airflow/providers/google/cloud/operators/dataflow.py",
line 647, in execut
on_new_job_id_callback=set_current_job_id
File
"/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py",
line 383, in inner_wrappe
return func(self, *args, **kwargs
File
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line
804, in start_flex_templat
jobs_controller.wait_for_done(
File
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line
348, in wait_for_don
while self._jobs and not all(self._check_dataflow_job_state(job) for job
in self._jobs)
File
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line
348, in <genexpr
while self._jobs and not all(self._check_dataflow_job_state(job) for job
in self._jobs)
File
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line
321, in _check_dataflow_job_stat
wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
KeyError: 'type
I have specified:
```
with models.DAG(
dag_id="pdc-test",
start_date=days_ago(1),
schedule_interval=None,
) as dag_flex_template:
start_flex_template = DataflowStartFlexTemplateOperator(
task_id="pdc-test",
body={
"launchParameter": {
"containerSpecGcsPath": GCS_FLEX_TEMPLATE_TEMPLATE_PATH,
"jobName": DATAFLOW_FLEX_TEMPLATE_JOB_NAME,
"parameters": {
"stage": STAGE,
"target": TARGET,
"path": PATH,
"filename": FILENAME,
"column": "geometry"
},
"environment": {
"network": NETWORK,
"subnetwork": SUBNETWORK,
"machineType": "n1-standard-1",
"numWorkers": "1",
"maxWorkers": "1",
"tempLocation": "gs://test-pipelines-work/batch",
"workerZone": "northamerica-northeast1",
"enableStreamingEngine": "false",
"serviceAccountEmail":
"<number>[email protected]",
"ipConfiguration": "WORKER_IP_PRIVATE"
},
}
},
location=LOCATION,
project_id=GCP_PROJECT_ID
)```
**What you expected to happen**:
Expecting the dag to run.
<!-- What do you think went wrong? -->
Appears the Operator is not handling the input as a batch type Flex
Template. DataflowJobType should be BATCH and not STREAMING.
**How to reproduce it**:
1. Create a Batch Flex Template as of
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
2. Point code above to your registered template and invoke.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]