filippociceri opened a new issue #15483:
URL: https://github.com/apache/airflow/issues/15483


   **Apache Airflow version**:
   composer-1.16.1-airflow-1.10.15
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**:  Google Composer
   
   **What happened**:
   First, a bit of context. We have a single instance of airflow within its own 
GCP project, which runs dataflows jobs on different GCP projects.
   
   Let's call the project which runs airflow project A, while the project where 
dataflow jobs are run project D.
   
   We recently upgraded from 1.10.14 to 1.10.15 
(`composer-1.14.2-airflow-1.10.14` to `composer-1.16.1-airflow-1.10.15`), and 
noticed that jobs were running successfully from the Dataflow console, but an 
error was being thrown when the `wait_for_done` call was being made by airflow 
to check if a dataflow job had ended. The error was reporting a 403 error code 
on Dataflow APIs when retrieving the job state. The error was:
   ```
   {taskinstance.py:1152} ERROR - <HttpError 403 when requesting 
https://dataflow.googleapis.com/v1b3/projects/<PROJECT_A>/locations/us-east1/jobs/<JOB_NAME>?alt=json
 returned "(9549b560fdf4d2fe): Permission 'dataflow.jobs.get' denied on 
project: '<PROJECT_A>". Details: "(9549b560fdf4d2fe): Permission 
'dataflow.jobs.get' denied on project: '<PROJECT_A>'">
   ```
   
   **What you expected to happen**:
   
   I noticed that the 403 code was thrown when looking up the job state within 
project A, while I expect this lookup to happen within project D (and to 
consequently NOT fail, since the associated service account has the correct 
permissions - since it managed to launch the job). I investigated a bit, and 
noticed that this looks like a regression introduced when upgrading to 
`composer-1.16.1-airflow-1.10.15`.
   
   This version uses an image which automatically installs 
`apache-airflow-backport-providers-apache-beam==2021.3.13`, which backports the 
dataflow operator from v2. The previous version we were using was installing 
`apache-airflow-backport-providers-google==2020.11.23`
   
   I checked the commits and changes, and noticed that this operator was last 
modified in 
https://github.com/apache/airflow/commit/1872d8719d24f94aeb1dcba9694837070b9884ca.
 Relevant lines from that commit are the following:
   
https://github.com/apache/airflow/blob/1872d8719d24f94aeb1dcba9694837070b9884ca/airflow/providers/google/cloud/operators/dataflow.py#L1147-L1162
   
   
   while these are from the previous version:
   
https://github.com/apache/airflow/blob/70bf307f3894214c523701940b89ac0b991a3a63/airflow/providers/google/cloud/operators/dataflow.py#L965-L976
   
https://github.com/apache/airflow/blob/70bf307f3894214c523701940b89ac0b991a3a63/airflow/providers/google/cloud/hooks/dataflow.py#L613-L644
   
https://github.com/apache/airflow/blob/70bf307f3894214c523701940b89ac0b991a3a63/airflow/providers/google/cloud/hooks/dataflow.py#L965-L972
   
   
   In the previous version, the job was started by calling 
`start_python_dataflow`, which in turn would call the `_start_dataflow` method, 
which would then create a local `job_controller` and use it to check if the job 
had ended. Throughout this chain of calls, the `project_id` parameter was 
passed all the way from the initialization of the 
`DataflowCreatePythonJobOperator` to the creation of the controller which would 
check if the job had ended.
   
   In the latest relevant commit, this behavior was changed. The operator 
receives a project_id during intialization, and creates the job using the 
`start_python_pipeline` method, which receives the `project_id` as part of the 
`variables` parameter. However, the completion of the job is checked by the 
`dataflow_hook.wait_for_done` call. The DataFlowHook used here:
   * does not specify the project_id when it is initialized
   * does not specify the project_id as a parameter when making the call to 
check for completion (the `wait_for_done` call)
   
   As a result, it looks like it is using the default GCP project ID (the one 
which the composer is running inside) and not the one used to create the 
Dataflow job. This explains why we can see the job launching successfully while 
the operator fails.
   
   I think that specifying the `project_id` as a parameter in the 
`wait_for_done` call may solve the issue.
   
   **How to reproduce it**:
   
   - Instatiate a composer on a new GCP project.
   - Launch a simple Dataflow job on another project
   
   The Dataflow job will succeed (you can see no errors get thrown from the GCP 
console), but an error will be thrown in airflow logs.
   
   **Note:** I am reporting a 403 because the service account I am using which 
is associated to airflow does not have the correct permissions. I suspect that, 
even with the correct permission, you may get another error (maybe 404, since 
there will be no job running with that ID within the project) but I have no way 
to test this at the moment.
   
   **Anything else we need to know**:
   
   This problem occurs every time I launch a Dataflow job on a project where 
the composer isn't running.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to