josh-fell opened a new issue #16618:
URL: https://github.com/apache/airflow/issues/16618


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the 
following questions.
   Don't worry if they're not all applicable; just try to include what you can 
:-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   **Description**
   Currently the `output` property of operators doesn't support accessing a 
specific value within an `XCom` but rather the _entire_ `XCom` value.  Ideally 
the behavior of calling the `XComArg` via the `output` property would function 
the same as the `task_instance.xcom_pull()` method in which a user has 
immediate access the `XCom` value and can directly access specific values in 
that `XCom`. 
   
   For example, in the [example 
DAG](https://github.com/apache/airflow/blob/main/airflow/providers/apache/beam/example_dags/example_beam.py)
 in the Apache Beam provider, the `jobId` arg in the `DataflowJobStatusSensor` 
task is a templated value using the `task_instance.xcom_pull()` method and is 
then accessing the `dataflow_job_id` key within the `XCom` value:
   ```python
   start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
           task_id="start_python_job_dataflow_runner_async",
           runner="DataflowRunner",
           py_file=GCS_PYTHON_DATAFLOW_ASYNC,
           pipeline_options={
               'tempLocation': GCS_TMP,
               'stagingLocation': GCS_STAGING,
               'output': GCS_OUTPUT,
           },
           py_options=[],
           py_requirements=['apache-beam[gcp]==2.26.0'],
           py_interpreter='python3',
           py_system_site_packages=False,
           dataflow_config=DataflowConfiguration(
               job_name='{{task.task_id}}',
               project_id=GCP_PROJECT_ID,
               location="us-central1",
               wait_until_finished=False,
           ),
       )
   
   wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
           task_id="wait-for-python-job-async-done",
           
job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
           expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
           project_id=GCP_PROJECT_ID,
           location='us-central1',
       )
   ```
   There is no current, equivalent way to directly access the `dataflow_job_id` 
value in same manner using the `output` property.
   
   Using `start_python_job_dataflow_runner_async.output["dataflow_job_id"]` 
yields an equivalent 
`task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', 
key='dataflow_job_id'`.
   
   Or even 
`start_python_job_dataflow_runner_async.output["return_value"]["dataflow_job_id"]`
 yields the same result: 
`task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', 
key='dataflow_job_id'`.
   
   **Use case / motivation**
   It's functionally intuitive for users to have direct access to the specific 
values in an `XCom` related to the `XComArg` via the Taskflow API as with the 
classic `xcom_pull()` method.
   
   **Are you willing to submit a PR?**
   I would love to but I would certainly need some guidance on nuances here.
   
   **Related Issues**
   https://github.com/apache/airflow/issues/10285
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to