lukas-mi opened a new pull request, #41887:
URL: https://github.com/apache/airflow/pull/41887
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
## Summary: this PR fixes the issue of using the default `location`, namely
`us-central1`, instead of the user-supplied `location` argument.
`location` is not passed through in several places in
`providers/apache/beam/operators/beam.py` and
`providers/google/cloud/operators/dataflow.py` causing the dataflow API calls
to always use `DEFAULT_DATAFLOW_LOCATION = 'us-central1'`:
1. During job cancellation when, for example, `BeamRunJavaPipelineOperator`
job is marked as `success/failed`:
- This leads to the following exception:
```
[2024-08-28, 11:24:29 UTC] {beam.py:657} INFO - Dataflow job with
id: `2024-08-28_04_23_24-13750676948168932211` was requested to be cancelled.
[2024-08-28, 11:24:30 UTC] {taskinstance.py:441} ▼ Post task
execution logs
[2024-08-28, 11:24:30 UTC] {taskinstance.py:2905} ERROR - Task
failed with exception
Traceback (most recent call last):
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 465, in _execute_task
result = _execute_callable(context=context,
**execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 432, in _execute_callable
return execute_callable(context=context,
**execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 401, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
line 556, in execute
return self.execute_sync(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
line 583, in execute_sync
self.beam_hook.start_java_pipeline(
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
line 315, in start_java_pipeline
self._start_pipeline(
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
line 206, in _start_pipeline
run_beam_command(
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
line 161, in run_beam_command
process_fd(proc, readable_fd, log, process_line_callback,
check_job_status_callback)
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
line 115, in process_fd
for line in iter(fd.readline, b""):
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 2612, in signal_handler
self.task.on_kill()
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
line 658, in on_kill
self.dataflow_hook.cancel_job(
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py",
line 559, in inner_wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 1112, in cancel_job
jobs_controller.cancel()
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 511, in cancel
job for job in self.get_jobs() if job["currentState"] not in
DataflowJobStatus.TERMINAL_STATES
^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 479, in get_jobs
self._refresh_jobs()
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 400, in _refresh_jobs
self._jobs = self._get_current_jobs()
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 263, in _get_current_jobs
return [self.fetch_job_by_id(self._job_id)]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 290, in fetch_job_by_id
.execute(num_retries=self._num_retries)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/googleapiclient/_helpers.py",
line 130, in positional_wrapper
return wrapped(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/googleapiclient/http.py",
line 938, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 404 when requesting
https://dataflow.googleapis.com/v1b3/projects/my-project/locations/us-central1/jobs/2024-08-28_04_23_24-13750676948168932211?alt=json
returned "(4f38a283a310abc0): Information about job
2024-08-28_04_23_24-13750676948168932211 could not be found in our system.
Please double check that the API being used is projects.locations.jobs.get
(https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get).
If the API being used is projects.locations.jobs.get, please double check the
id (2024-08-28_04_23_24-13750676948168932211) is correct. If it is please
contact customer support.". Details: "(4f38a283a310abc0): Information about job
2024-08-28_04_23_24-13750676948168932211 could not be found in our system.
Please double check that the API being used is projects.locations.jobs.get
(https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get).
If the API being used is
projects.locations.jobs.get, please double check the id
(2024-08-28_04_23_24-13750676948168932211) is correct. If it is please contact
customer support.">
```
- I've fixed it by passing through `location` parameter in the calls of
`cancel_job` defined in `providers/google/cloud/hooks/dataflow.py`
2. When a new job is launched (with `CheckJobRunning.WaitForRun` setting),
it should wait until the previous job with the same name is cancelled/drained.
When non default `location` is used `europe-west1`, it's ignored and, thus, the
previous job cannot be found at the same location, leading to the new job
starting.
- I tried excplicitly passing the location to `is_job_dataflow_running`,
however, because of the of the `@_fallback_to_location_from_variables`
annotation an exception is raised regarding mutually exclusive parameters being
specified:
```
[2024-08-28, 11:49:18 UTC] {taskinstance.py:2905} ERROR - Task
failed with exception
Traceback (most recent call last):
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 465, in _execute_task
result = _execute_callable(context=context,
**execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 432, in _execute_callable
return execute_callable(context=context,
**execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 401, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
line 556, in execute
return self.execute_sync(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
line 574, in execute_sync
is_running = self.dataflow_hook.is_job_dataflow_running(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
line 125, in inner_wrapper
raise AirflowException(
airflow.exceptions.AirflowException: The mutually exclusive
parameter `location` and `region` key in `variables` parameter are both
present. Please remove one.
```
- While `@_fallback_to_location_from_variables` should ensure that a
fallback location value is used from `variables` (`pipeline_options`) when
`location` is not supplied, this never happens because the default `location:
str = DEFAULT_DATAFLOW_LOCATION` is set in the function, resulting in
`us-central1` to always be used as `location`
- I've fixed this issue by removing
`@_fallback_to_location_from_variables` annotation from
`is_job_dataflow_running` defined in
`providers/google/cloud/hooks/dataflow.py`. I also removed the default
`location` value setting in `is_job_dataflow_running` because, to my mind, it
cause more harm than it is useful. In all the calls of
`is_job_dataflow_running` I passed location values available in scope, which by
default is set to `location: str = DEFAULT_DATAFLOW_LOCATION` anyway when
looking at the class constructors.
How my testing task is configured:
```
start_beam_pipeline = BeamRunJavaPipelineOperator(
task_id="start-beam-pipeline",
runner="DataflowRunner",
jar=jar_location,
job_class=job_class,
# https://cloud.google.com/dataflow/docs/reference/pipeline-options#java
pipeline_options={
"tempLocation": f"gs://my-bucket/tmp",
"stagingLocation": f"gs://my-bucket/staging",
"enableStreamingEngine": True,
"workerMachineType": "n2d-standard-4",
"maxNumWorkers": 3,
},
#
https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/dataflow/index.html#airflow.providers.google.cloud.operators.dataflow.DataflowConfiguration
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
append_job_name=False,
project_id="my_project',
location="europe-west1",
drain_pipeline=True,
check_if_running=CheckJobRunning.WaitForRun
)
)
```
I'm running Airflow with these changes and I can confirm that
`BeamRunJavaPipelineOperator` behaves as it should, fixing the 2 points above.
I've used the following versions for testing:
- Airflow: 2.10.0
- Beam provider: 5.8.0
- Google provider: 10.22.0
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]