luke-hoffman1 opened a new issue, #47579:
URL: https://github.com/apache/airflow/issues/47579

   ### Apache Airflow Provider(s)
   
   openlineage, google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==10.26.0
   apache-airflow-providers-openlineage==2.1.0
   
   ### Apache Airflow version
   
   2.10.5
   
   ### Operating System
   
   macOS Sequoia Version 15.2 (24C101)
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   FROM quay.io/astronomer/astro-runtime:12.7.1
   
   ### What happened
   
   When I call a BigQuery routine (stored procedure) that contains multiple 
CTAS statements using `BigQueryInsertJobOperator`, OpenLineage produces a 
COMPLETE Task event with aggregated inputs and outputs. The problem with this 
is that you don't get lineage at the "sub-task" CTAS level. For example,
   
   CTAS 1: creates output_table1 from input_table1
   CTAS 2: creates output_table2 from input_table2
   
   mock OL event:
   
   ```
   {
       "inputs": [
           {
               "namespace": "bigquery",
               "name": "input_table1"
           },
           {
               "namespace": "bigquery",
               "name": "input_table2"
           }
       ],
       "outputs": [
           {
               "namespace": "bigquery",
               "name": "output_table1"
           },
           {
               "namespace": "bigquery",
               "name": "output_table2"
           }
       ]
   }
   ```
   
   Now it looks like input_table1 and 2 went to both output tables, but that 
isn't accurate from the "sub-task" perspective.
   
   ### What you think should happen instead
   
   I'm not certain if this issue constitutes a bug or rather a potential design 
or implementation gap. The stored procedure operates at a 'sub-task' level, but 
OpenLineage appears to only support events at the DAG and Task levels. I'm 
unaware if there have been prior discussions on this topic, but ideally, it 
would be beneficial to obtain sub-task events from OpenLineage that provide a 
breakdown of inputs and outputs, aligning with the read and write operations 
performed by the CTAS statements in the stored procedure.
   
   ### How to reproduce
   
   DAG:
   
   ```
   from airflow import DAG
   from airflow.providers.google.cloud.operators.bigquery import (
       BigQueryInsertJobOperator
   )
   from datetime import datetime
   
   dag = DAG(
       dag_id="dag_execute_bq_stored_proc",
       schedule_interval=None,
       start_date=datetime(2025, 3, 4),  # Start date
   )
   
   stored_proc = (
       "<bq-project-id>.<bq-dataset>.<bq-routine>"
   )
   
   task1 = BigQueryInsertJobOperator(
       task_id="task1",
       gcp_conn_id="bq_conn",
       configuration={
           "query": {
               "query": f"CALL `{stored_proc}`();",
               "useLegacySql": False,
               "priority": "BATCH",
           }
       },
       dag=dag,
   )
   
   task1
   ```
   
   BigQuery Routine/Stored Procedure
   
   ```
   BEGIN
     -- Create a dummy table
     CREATE TEMP TABLE dummy_table AS
     SELECT 1 AS id, 'row1' AS value
     UNION ALL
     SELECT 2 AS id, 'row2' AS value
     UNION ALL
     SELECT 3 AS id, 'row3' AS value;
   
      -- Create a dummy table
     CREATE TEMP TABLE dummy_table2 AS
     SELECT 1 AS id, 'row1' AS value
     UNION ALL
     SELECT 2 AS id, 'row2' AS value
     UNION ALL
     SELECT 3 AS id, 'row3' AS value;
   
     -- Read the dummy table and create another table via CTAS
     CREATE OR REPLACE TABLE <bq-dataset>.another_table AS
     SELECT * FROM dummy_table;
   
     CREATE OR REPLACE TABLE <bq-dataset>.another_table2 AS
     SELECT * FROM dummy_table2;
   END
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to