josh-fell opened a new pull request #16875:
URL: https://github.com/apache/airflow/pull/16875
Related to #10285
- Updated example DAG files such that `xcom_pull()` calls use an operator's
`.output` property as well access of `TaskInstance` objects from context to use
`get_current_context()` function
- Added comments to which task dependencies, if any, are handled and/or
created via `XComArgs` for transparency
- Removed or refactored the `default_args` pattern where necessary as
requested by @ashb (i.e. removed a separated `default_args` declaration for
deference for declaration as part of the `DAG` object)
- Other miscellaneous updates based on `.output` refactoring
>**Note:** There are several instances where the `xcom_pull()` call was not
updated. These instances involve accessing a specific value within the `XCom`
or calling user-defined macros with an `XCom` value. Reference #16618 for an
open issue to enhance the `XComArg` functionality to provide similar behavior
as the classic `xcom_pull()` method.
> **Note:** Not all DAGs were tested functionally (i.e. with hard
integrations to source systems and executed), however each DAG was tested to
compile and generate a DAG graph as expected locally.
An detailed summary of all changes made as part of this PR can be found
below:
| DAG File | Converted `xcom_pull()`? | Other Updates? | Comments |
| ---------| ------------------------ | -------------- | -------- |
|
airflow/providers/google/cloud/example_dags/example_automl_nl_text_classification.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
|
airflow/providers/google/cloud/example_dags/example_automl_nl_text_extraction.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
|
airflow/providers/google/cloud/example_dags/example_automl_nl_text_sentiment.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
| airflow/providers/google/cloud/example_dags/example_automl_tables.py | Yes
** | Yes | ** Current `XComArg` object does not support accessing specific
values of an iterable `XCom` value elegantly. Did not update the following
occurrences: <br/>`"{{
extract_object_id(task_instance.xcom_pull('list_tables_spec_task')[0])
}}"`<br/>`'{{ task_instance.xcom_pull("create_dataset_task")["name"]
}}'`</br>`"{{
get_target_column_spec(task_instance.xcom_pull('list_columns_spec_task'),
target) }}"`<br/><br/>All other `xcom_pull()` calls have been updated.
<br/><br/> Removed explicit task dependencies that are created via `XComArgs`. |
| airflow/providers/google/cloud/example_dags/example_automl_translation.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
|
airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_classification.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
|
airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_tracking.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
|
airflow/providers/google/cloud/example_dags/example_automl_vision_classification.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
|
airflow/providers/google/cloud/example_dags/example_automl_vision_object_detection.py
| Yes | Yes | Removed explicit task dependencies that are created via
`XComArgs`. |
|
airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py |
No | Yes | Refactored `default_args` pattern. |
| airflow/providers/google/cloud/example_dags/example_bigquery_dts.py | Yes
| Yes | Removed explicit task dependencies that are created via `XComArgs`. |
| airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
| No | No | Current `XComArg` object does not support accessing specific values
of an iterable `XCom` value elegantly. Did not update any occurrences in this
DAG. |
| airflow/providers/google/cloud/example_dags/example_bigquery_queries.py |
Yes | Yes | Added globals assignment so both example DAGs are exposed rather
than only the last one in the loop. |
| airflow/providers/google/cloud/example_dags/example_cloud_build.py | No |
No | Current `XComArg` object does not support accessing specific values of an
iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
| airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py |
Yes ** | No | ** Current `XComArg` object does not support accessing specific
values of an iterable `XCom` value elegantly. Did not update this occurrence:
<br/>`"user-{{
task_instance.xcom_pull('get-instance')['persistence_iam_identity']"`
<br/><br/>All other `xcom_pull()` calls have been updated. |
| airflow/providers/google/cloud/example_dags/example_datacatalog.py | Yes
** | Yes | ** Current `XComArg` object does not support accessing specific
values of an iterable `XCom` value elegantly. Did not update this occurrence:
</br>`"echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\""`
<br/><br/>All other `xcom_pull()` calls have been updated. |
| airflow/providers/google/cloud/example_dags/example_cloud_sql.py | Yes |
Yes | Removed explicit task dependencies that are created via `XComArgs`.
</br></br>Removed duplicate/redundant task. |
|
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
| No | Yes | Current `XComArg` object does not support accessing specific
values of an iterable `XCom` value elegantly. Did not update any occurrences in
this DAG. </br></br>Removed unused `default_args` variable. |
|
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
| No | No | Current `XComArg` object does not support accessing specific
values of an iterable `XCom` value elegantly. Did not update any occurrences in
this DAG. |
|
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
| Yes ** | Yes | ** Current `XComArg` object does not support accessing
specific values of an iterable `XCom` value elegantly. Did not update this
occurrence: <br/>`"echo \"{{
task_instance.xcom_pull('lookup_entry')['display_name'] }}\""` </br></br>All
other `xcom_pull()` calls have been updated. |
| airflow/providers/google/cloud/example_dags/example_dataflow.py| No | No |
Current `XComArg` object does not support accessing specific values of an
iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
| airflow/providers/google/cloud/example_dags/example_dataproc.py | Yes |
Yes | Removed explicit task dependencies that are created via `XComArgs`. |
| airflow/providers/google/cloud/example_dags/example_datastore.py | Yes **
| Yes | ** Current `XComArg` object does not support accessing specific values
of an iterable `XCom` value elegantly. Did not update these occurrences: </br>`
"{{task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2]
}}"` </br>`"{{
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
}}"` </br></br>All other `xcom_pull()` calls have been updated.
</br></br>Removed explicit task dependencies that are created via `XComArgs`. |
| airflow/providers/google/cloud/example_dags/example_functions.py | No |
Yes | Added `default_args` to DAG as there was logic to populate the dict but
not being applied. |
| airflow/providers/google/cloud/example_dags/example_dlp.py | No | No |
Current `XComArg` object does not support accessing specific values of an
iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
| airflow/providers/google/cloud/example_dags/example_gcs.py | Yes | No ||
| airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py |
No | No | Current `XComArg` object does not support accessing specific values
of an iterable `XCom` value elegantly. Did not update any occurrences in this
DAG. |
| airflow/providers/google/cloud/example_dags/example_mlengine.py | Yes |
Yes | Removed unneeded `default_args` pattern. |
| airflow/providers/google/cloud/example_dags/example_natural_language.py |
Yes | No ||
| airflow/providers/google/cloud/example_dags/example_pubsub.py | Yes ** |
Yes | ** Current `XComArg` object does not support accessing specific values of
an iterable `XCom` value elegantly. Did not update the following occurrence:
<br/>`"""{% for m in task_instance.xcom_pull('pull_messages') %} echo "AckID:
{{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}" {% endfor
%}"""`<br/><br/>All other `xcom_pull()` calls have been updated. <br/><br/>
Removed explicit task dependencies that are created via `XComArgs`. |
| airflow/providers/google/cloud/example_dags/example_stackdriver.py | No |
No | Current `XComArg` object does not support accessing specific values of an
iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
| airflow/providers/google/cloud/example_dags/example_tasks.py | Yes | No ||
| airflow/providers/google/cloud/example_dags/example_translate.py | No | No
| Current `XComArg` object does not support accessing specific values of an
iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
| airflow/providers/google/cloud/example_dags/example_video_intelligence.py
| No | No | Current `XComArg` object does not support accessing specific values
of an iterable `XCom` value elegantly. Did not update any occurrences in this
DAG. |
| airflow/providers/google/cloud/example_dags/example_vision.py | Yes ** |
Yes | ** Current `XComArg` object does not support accessing specific values of
an iterable `XCom` value elegantly. Did not update the following occurrences:
<br/>`"echo {{
task_instance.xcom_pull('annotate_image')['logoAnnotations'][0]['description']
}}"` </br>`"echo {{
task_instance.xcom_pull('detect_text')['textAnnotations'][0] }}"` </br>`"echo
{{ task_instance.xcom_pull('document_detect_text')['textAnnotations'][0] }}"`
</br>`"echo {{ task_instance.xcom_pull('detect_labels')['labelAnnotations'][0]
}}"`<br/><br/>All other `xcom_pull()` calls have been updated. <br/><br/>
Removed explicit task dependencies that are created via `XComArgs`. |
| airflow/providers/google/cloud/example_dags/example_workflows.py | Yes |
Yes | Removed explicit task dependencies that are created via `XComArgs`. |
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
for more information.
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]