luke-hoffman1 opened a new issue, #47579:
URL: https://github.com/apache/airflow/issues/47579
### Apache Airflow Provider(s)
openlineage, google
### Versions of Apache Airflow Providers
apache-airflow-providers-google==10.26.0
apache-airflow-providers-openlineage==2.1.0
### Apache Airflow version
2.10.5
### Operating System
macOS Sequoia Version 15.2 (24C101)
### Deployment
Astronomer
### Deployment details
FROM quay.io/astronomer/astro-runtime:12.7.1
### What happened
When I call a BigQuery routine (stored procedure) that contains multiple
CTAS statements using `BigQueryInsertJobOperator`, OpenLineage produces a
COMPLETE Task event with aggregated inputs and outputs. The problem with this
is that you don't get lineage at the "sub-task" CTAS level. For example,
CTAS 1: creates output_table1 from input_table1
CTAS 2: creates output_table2 from input_table2
mock OL event:
```
{
"inputs": [
{
"namespace": "bigquery",
"name": "input_table1"
},
{
"namespace": "bigquery",
"name": "input_table2"
}
],
"outputs": [
{
"namespace": "bigquery",
"name": "output_table1"
},
{
"namespace": "bigquery",
"name": "output_table2"
}
]
}
```
Now it looks like input_table1 and 2 went to both output tables, but that
isn't accurate from the "sub-task" perspective.
### What you think should happen instead
I'm not certain if this issue constitutes a bug or rather a potential design
or implementation gap. The stored procedure operates at a 'sub-task' level, but
OpenLineage appears to only support events at the DAG and Task levels. I'm
unaware if there have been prior discussions on this topic, but ideally, it
would be beneficial to obtain sub-task events from OpenLineage that provide a
breakdown of inputs and outputs, aligning with the read and write operations
performed by the CTAS statements in the stored procedure.
### How to reproduce
DAG:
```
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator
)
from datetime import datetime
dag = DAG(
dag_id="dag_execute_bq_stored_proc",
schedule_interval=None,
start_date=datetime(2025, 3, 4), # Start date
)
stored_proc = (
"<bq-project-id>.<bq-dataset>.<bq-routine>"
)
task1 = BigQueryInsertJobOperator(
task_id="task1",
gcp_conn_id="bq_conn",
configuration={
"query": {
"query": f"CALL `{stored_proc}`();",
"useLegacySql": False,
"priority": "BATCH",
}
},
dag=dag,
)
task1
```
BigQuery Routine/Stored Procedure
```
BEGIN
-- Create a dummy table
CREATE TEMP TABLE dummy_table AS
SELECT 1 AS id, 'row1' AS value
UNION ALL
SELECT 2 AS id, 'row2' AS value
UNION ALL
SELECT 3 AS id, 'row3' AS value;
-- Create a dummy table
CREATE TEMP TABLE dummy_table2 AS
SELECT 1 AS id, 'row1' AS value
UNION ALL
SELECT 2 AS id, 'row2' AS value
UNION ALL
SELECT 3 AS id, 'row3' AS value;
-- Read the dummy table and create another table via CTAS
CREATE OR REPLACE TABLE <bq-dataset>.another_table AS
SELECT * FROM dummy_table;
CREATE OR REPLACE TABLE <bq-dataset>.another_table2 AS
SELECT * FROM dummy_table2;
END
```
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]