terekete opened a new issue #13262:
URL: https://github.com/apache/airflow/issues/13262


   
   **Apache Airflow version**:
   1. 1.10.9 Composer Airflow Image
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Cloud Composer
   
   **What happened**:
   Error logs indicate appears to not recognize the job as Batch.
   
   
   [2020-12-22 16:28:53,445] {taskinstance.py:1135} ERROR - 'type'
   Traceback (most recent call last)
     File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in 
_run_raw_tas
       result = task_copy.execute(context=context
     File 
"/usr/local/lib/airflow/airflow/providers/google/cloud/operators/dataflow.py", 
line 647, in execut
       on_new_job_id_callback=set_current_job_id
     File 
"/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", 
line 383, in inner_wrappe
       return func(self, *args, **kwargs
     File 
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 
804, in start_flex_templat
       jobs_controller.wait_for_done(
     File 
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 
348, in wait_for_don
       while self._jobs and not all(self._check_dataflow_job_state(job) for job 
in self._jobs)
     File 
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 
348, in <genexpr
       while self._jobs and not all(self._check_dataflow_job_state(job) for job 
in self._jobs)
     File 
"/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 
321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type
   
   
   I have specified:
   
   ```
   with models.DAG(
       dag_id="pdc-test",
       start_date=days_ago(1),
       schedule_interval=None,
   ) as dag_flex_template:
       start_flex_template = DataflowStartFlexTemplateOperator(
           task_id="pdc-test",
           body={
               "launchParameter": {
                   "containerSpecGcsPath": GCS_FLEX_TEMPLATE_TEMPLATE_PATH,
                   "jobName": DATAFLOW_FLEX_TEMPLATE_JOB_NAME,
                   "parameters": {
                       "stage": STAGE,
                       "target": TARGET,
                       "path": PATH,
                       "filename": FILENAME,
                       "column": "geometry"
                   },
                   "environment": {
                       "network": NETWORK,
                       "subnetwork": SUBNETWORK,
                       "machineType": "n1-standard-1",
                       "numWorkers": "1",
                       "maxWorkers": "1",
                       "tempLocation": "gs://test-pipelines-work/batch",
                       "workerZone": "northamerica-northeast1",
                       "enableStreamingEngine": "false",
                       "serviceAccountEmail": 
"<number>[email protected]",
                       "ipConfiguration": "WORKER_IP_PRIVATE"
                   },
               }
           },
           location=LOCATION,
           project_id=GCP_PROJECT_ID
       )```
   
   
   **What you expected to happen**:
   
   Expecting the dag to run.
   
   <!-- What do you think went wrong? -->
   
   Appears the Operator is not handling the input as a batch type Flex 
Template. DataflowJobType should be BATCH and not STREAMING.
   
   **How to reproduce it**:
   1. Create a Batch Flex Template as of 
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
   2. Point code above to your registered template and invoke.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to