lukas-mi opened a new pull request, #41887:
URL: https://github.com/apache/airflow/pull/41887

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ## Summary: this PR fixes the issue of using the default `location`, namely 
`us-central1`, instead of the user-supplied `location` argument.
   
   `location` is not passed through in several places in 
`providers/apache/beam/operators/beam.py` and 
`providers/google/cloud/operators/dataflow.py` causing the dataflow API calls 
to always use `DEFAULT_DATAFLOW_LOCATION = 'us-central1'`:
   1. During job cancellation when, for example, `BeamRunJavaPipelineOperator` 
job is marked as `success/failed`:
       - This leads to the following exception:
           ```
           [2024-08-28, 11:24:29 UTC] {beam.py:657} INFO - Dataflow job with 
id: `2024-08-28_04_23_24-13750676948168932211` was requested to be cancelled.
           [2024-08-28, 11:24:30 UTC] {taskinstance.py:441} ▼ Post task 
execution logs
           [2024-08-28, 11:24:30 UTC] {taskinstance.py:2905} ERROR - Task 
failed with exception
           Traceback (most recent call last):
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 465, in _execute_task
               result = _execute_callable(context=context, 
**execute_callable_kwargs)
                       
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 432, in _execute_callable
               return execute_callable(context=context, 
**execute_callable_kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 401, in wrapper
               return func(self, *args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 556, in execute
               return self.execute_sync(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 583, in execute_sync
               self.beam_hook.start_java_pipeline(
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
 line 315, in start_java_pipeline
               self._start_pipeline(
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
 line 206, in _start_pipeline
               run_beam_command(
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
 line 161, in run_beam_command
               process_fd(proc, readable_fd, log, process_line_callback, 
check_job_status_callback)
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py",
 line 115, in process_fd
               for line in iter(fd.readline, b""):
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 2612, in signal_handler
               self.task.on_kill()
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 658, in on_kill
               self.dataflow_hook.cancel_job(
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py",
 line 559, in inner_wrapper
               return func(self, *args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 1112, in cancel_job
               jobs_controller.cancel()
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 511, in cancel
               job for job in self.get_jobs() if job["currentState"] not in 
DataflowJobStatus.TERMINAL_STATES
                           ^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 479, in get_jobs
               self._refresh_jobs()
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 400, in _refresh_jobs
               self._jobs = self._get_current_jobs()
                           ^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 263, in _get_current_jobs
               return [self.fetch_job_by_id(self._job_id)]
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 290, in fetch_job_by_id
               .execute(num_retries=self._num_retries)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/googleapiclient/_helpers.py",
 line 130, in positional_wrapper
               return wrapped(*args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/googleapiclient/http.py",
 line 938, in execute
               raise HttpError(resp, content, uri=self.uri)
           googleapiclient.errors.HttpError: <HttpError 404 when requesting 
https://dataflow.googleapis.com/v1b3/projects/my-project/locations/us-central1/jobs/2024-08-28_04_23_24-13750676948168932211?alt=json
 returned "(4f38a283a310abc0): Information about job 
2024-08-28_04_23_24-13750676948168932211 could not be found in our system. 
Please double check that the API being used is projects.locations.jobs.get 
(https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get).
 If the API being used is projects.locations.jobs.get, please double check the 
id (2024-08-28_04_23_24-13750676948168932211) is correct. If it is please 
contact customer support.". Details: "(4f38a283a310abc0): Information about job 
2024-08-28_04_23_24-13750676948168932211 could not be found in our system. 
Please double check that the API being used is projects.locations.jobs.get 
(https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get).
 If the API being used is
  projects.locations.jobs.get, please double check the id 
(2024-08-28_04_23_24-13750676948168932211) is correct. If it is please contact 
customer support.">
           ```
       - I've fixed it by passing through `location` parameter in the calls of 
`cancel_job` defined in `providers/google/cloud/hooks/dataflow.py`
   2. When a new job is launched (with `CheckJobRunning.WaitForRun` setting), 
it should wait until the previous job with the same name is cancelled/drained. 
When non default `location` is used `europe-west1`, it's ignored and, thus, the 
previous job cannot be found at the same location, leading to the new job 
starting.
       - I tried excplicitly passing the location to `is_job_dataflow_running`, 
however, because of the of the `@_fallback_to_location_from_variables` 
annotation an exception is raised regarding mutually exclusive parameters being 
specified:
           ```
           [2024-08-28, 11:49:18 UTC] {taskinstance.py:2905} ERROR - Task 
failed with exception
           Traceback (most recent call last):
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 465, in _execute_task
               result = _execute_callable(context=context, 
**execute_callable_kwargs)
                       
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 432, in _execute_callable
               return execute_callable(context=context, 
**execute_callable_kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 401, in wrapper
               return func(self, *args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 556, in execute
               return self.execute_sync(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 574, in execute_sync
               is_running = self.dataflow_hook.is_job_dataflow_running(
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File 
"/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 125, in inner_wrapper
               raise AirflowException(
           airflow.exceptions.AirflowException: The mutually exclusive 
parameter `location` and `region` key in `variables` parameter are both 
present. Please remove one.
           ```
       -  While `@_fallback_to_location_from_variables` should ensure that a 
fallback location value is used from `variables` (`pipeline_options`) when 
`location` is not supplied, this never happens because the default `location: 
str = DEFAULT_DATAFLOW_LOCATION` is set in the function, resulting in 
`us-central1` to always be used as `location`
       - I've fixed this issue by removing 
`@_fallback_to_location_from_variables` annotation from 
`is_job_dataflow_running` defined in 
`providers/google/cloud/hooks/dataflow.py`. I also removed the default 
`location` value setting in `is_job_dataflow_running` because, to my mind, it 
cause more harm than it is useful. In all the calls of 
`is_job_dataflow_running` I passed location values available in scope, which by 
default is set to `location: str = DEFAULT_DATAFLOW_LOCATION` anyway when 
looking at the class constructors.
   
   How my testing task is configured:
   ```
   start_beam_pipeline = BeamRunJavaPipelineOperator(
       task_id="start-beam-pipeline",
       runner="DataflowRunner",
       jar=jar_location,
       job_class=job_class,
       # https://cloud.google.com/dataflow/docs/reference/pipeline-options#java
       pipeline_options={
           "tempLocation": f"gs://my-bucket/tmp",
           "stagingLocation": f"gs://my-bucket/staging",
           "enableStreamingEngine": True,
           "workerMachineType": "n2d-standard-4",
           "maxNumWorkers": 3,
       },
       # 
https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/dataflow/index.html#airflow.providers.google.cloud.operators.dataflow.DataflowConfiguration
       dataflow_config=DataflowConfiguration(
           job_name="{{task.task_id}}",
           append_job_name=False,
           project_id="my_project',
           location="europe-west1",
           drain_pipeline=True,
           check_if_running=CheckJobRunning.WaitForRun
       )
   )
   ```
   
   I'm running Airflow with these changes and I can confirm that 
`BeamRunJavaPipelineOperator` behaves as it should, fixing the 2 points above.
   
   I've used the following versions for testing:
   - Airflow: 2.10.0
   - Beam provider: 5.8.0
   - Google provider: 10.22.0
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to