josh-fell opened a new issue #16618: URL: https://github.com/apache/airflow/issues/16618
<!-- Welcome to Apache Airflow! For a smooth issue process, try to answer the following questions. Don't worry if they're not all applicable; just try to include what you can :-) If you need to include code snippets or logs, please put them in fenced code blocks. If they're super-long, please use the details tag like <details><summary>super-long log</summary> lots of stuff </details> Please delete these comment blocks before submitting the issue. --> **Description** Currently the `output` property of operators doesn't support accessing a specific value within an `XCom` but rather the _entire_ `XCom` value. Ideally the behavior of calling the `XComArg` via the `output` property would function the same as the `task_instance.xcom_pull()` method in which a user has immediate access the `XCom` value and can directly access specific values in that `XCom`. For example, in the [example DAG](https://github.com/apache/airflow/blob/main/airflow/providers/apache/beam/example_dags/example_beam.py) in the Apache Beam provider, the `jobId` arg in the `DataflowJobStatusSensor` task is a templated value using the `task_instance.xcom_pull()` method and is then accessing the `dataflow_job_id` key within the `XCom` value: ```python start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator( task_id="start_python_job_dataflow_runner_async", runner="DataflowRunner", py_file=GCS_PYTHON_DATAFLOW_ASYNC, pipeline_options={ 'tempLocation': GCS_TMP, 'stagingLocation': GCS_STAGING, 'output': GCS_OUTPUT, }, py_options=[], py_requirements=['apache-beam[gcp]==2.26.0'], py_interpreter='python3', py_system_site_packages=False, dataflow_config=DataflowConfiguration( job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1", wait_until_finished=False, ), ) wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor( task_id="wait-for-python-job-async-done", job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}", expected_statuses={DataflowJobStatus.JOB_STATE_DONE}, project_id=GCP_PROJECT_ID, location='us-central1', ) ``` There is no current, equivalent way to directly access the `dataflow_job_id` value in same manner using the `output` property. Using `start_python_job_dataflow_runner_async.output["dataflow_job_id"]` yields an equivalent `task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', key='dataflow_job_id'`. Or even `start_python_job_dataflow_runner_async.output["return_value"]["dataflow_job_id"]` yields the same result: `task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', key='dataflow_job_id'`. **Use case / motivation** It's functionally intuitive for users to have direct access to the specific values in an `XCom` related to the `XComArg` via the Taskflow API as with the classic `xcom_pull()` method. **Are you willing to submit a PR?** I would love to but I would certainly need some guidance on nuances here. **Related Issues** https://github.com/apache/airflow/issues/10285 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
