filippociceri opened a new issue #15483:
URL: https://github.com/apache/airflow/issues/15483
**Apache Airflow version**:
composer-1.16.1-airflow-1.10.15
**Environment**:
- **Cloud provider or hardware configuration**: Google Composer
**What happened**:
First, a bit of context. We have a single instance of airflow within its own
GCP project, which runs dataflows jobs on different GCP projects.
Let's call the project which runs airflow project A, while the project where
dataflow jobs are run project D.
We recently upgraded from 1.10.14 to 1.10.15
(`composer-1.14.2-airflow-1.10.14` to `composer-1.16.1-airflow-1.10.15`), and
noticed that jobs were running successfully from the Dataflow console, but an
error was being thrown when the `wait_for_done` call was being made by airflow
to check if a dataflow job had ended. The error was reporting a 403 error code
on Dataflow APIs when retrieving the job state. The error was:
```
{taskinstance.py:1152} ERROR - <HttpError 403 when requesting
https://dataflow.googleapis.com/v1b3/projects/<PROJECT_A>/locations/us-east1/jobs/<JOB_NAME>?alt=json
returned "(9549b560fdf4d2fe): Permission 'dataflow.jobs.get' denied on
project: '<PROJECT_A>". Details: "(9549b560fdf4d2fe): Permission
'dataflow.jobs.get' denied on project: '<PROJECT_A>'">
```
**What you expected to happen**:
I noticed that the 403 code was thrown when looking up the job state within
project A, while I expect this lookup to happen within project D (and to
consequently NOT fail, since the associated service account has the correct
permissions - since it managed to launch the job). I investigated a bit, and
noticed that this looks like a regression introduced when upgrading to
`composer-1.16.1-airflow-1.10.15`.
This version uses an image which automatically installs
`apache-airflow-backport-providers-apache-beam==2021.3.13`, which backports the
dataflow operator from v2. The previous version we were using was installing
`apache-airflow-backport-providers-google==2020.11.23`
I checked the commits and changes, and noticed that this operator was last
modified in
https://github.com/apache/airflow/commit/1872d8719d24f94aeb1dcba9694837070b9884ca.
Relevant lines from that commit are the following:
https://github.com/apache/airflow/blob/1872d8719d24f94aeb1dcba9694837070b9884ca/airflow/providers/google/cloud/operators/dataflow.py#L1147-L1162
while these are from the previous version:
https://github.com/apache/airflow/blob/70bf307f3894214c523701940b89ac0b991a3a63/airflow/providers/google/cloud/operators/dataflow.py#L965-L976
https://github.com/apache/airflow/blob/70bf307f3894214c523701940b89ac0b991a3a63/airflow/providers/google/cloud/hooks/dataflow.py#L613-L644
https://github.com/apache/airflow/blob/70bf307f3894214c523701940b89ac0b991a3a63/airflow/providers/google/cloud/hooks/dataflow.py#L965-L972
In the previous version, the job was started by calling
`start_python_dataflow`, which in turn would call the `_start_dataflow` method,
which would then create a local `job_controller` and use it to check if the job
had ended. Throughout this chain of calls, the `project_id` parameter was
passed all the way from the initialization of the
`DataflowCreatePythonJobOperator` to the creation of the controller which would
check if the job had ended.
In the latest relevant commit, this behavior was changed. The operator
receives a project_id during intialization, and creates the job using the
`start_python_pipeline` method, which receives the `project_id` as part of the
`variables` parameter. However, the completion of the job is checked by the
`dataflow_hook.wait_for_done` call. The DataFlowHook used here:
* does not specify the project_id when it is initialized
* does not specify the project_id as a parameter when making the call to
check for completion (the `wait_for_done` call)
As a result, it looks like it is using the default GCP project ID (the one
which the composer is running inside) and not the one used to create the
Dataflow job. This explains why we can see the job launching successfully while
the operator fails.
I think that specifying the `project_id` as a parameter in the
`wait_for_done` call may solve the issue.
**How to reproduce it**:
- Instatiate a composer on a new GCP project.
- Launch a simple Dataflow job on another project
The Dataflow job will succeed (you can see no errors get thrown from the GCP
console), but an error will be thrown in airflow logs.
**Note:** I am reporting a 403 because the service account I am using which
is associated to airflow does not have the correct permissions. I suspect that,
even with the correct permission, you may get another error (maybe 404, since
there will be no job running with that ID within the project) but I have no way
to test this at the moment.
**Anything else we need to know**:
This problem occurs every time I launch a Dataflow job on a project where
the composer isn't running.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]