[ 
https://issues.apache.org/jira/browse/SPARK-57352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18087904#comment-18087904
 ] 

Bernardo Alemar commented on SPARK-57352:
-----------------------------------------

Please feel free to reach out to me anytime. Implementing SDP would be 
game-changing for me.

> [SDP] Pipeline execution order is broken because lineage is not inferred from 
> spark.table / spark.readStream.table
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-57352
>                 URL: https://issues.apache.org/jira/browse/SPARK-57352
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, Declarative Pipelines
>    Affects Versions: 4.1.1
>         Environment: - {*}Spark Version{*}: 4.1.1
> - {*}Execution Mode{*}: Spark Connect
> - {*}Deployment Platform{*}: AWS EKS (Elastic Kubernetes Service)
> - {*}Catalog Metastore{*}: AWS Glue Data Catalog
> - {*}Table Format{*}: Apache Iceberg
> - {*}Apache Iceberg Version{*}: 1.10.1
> - {*}AWS Bundle Version{*}: 1.10.1
> - {*}Spark Packages{*}: 
>   org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.10.1
>   org.apache.iceberg:iceberg-aws-bundle:1.10.1
>            Reporter: Bernardo Alemar
>            Priority: Major
>              Labels: pyspark, spark
>
> *Error Description*
> When building a pipeline using the new Declarative Pipelines framework 
> (`pyspark.pipelines`), the execution engine fails to correctly infer 
> dependencies (DAG lineage) if a downstream table (e.g., Silver layer) 
> references an upstream table (e.g., Bronze layer) using `spark.table()` or 
> `spark.readStream.table()`.
> Because the framework does not analyze the table identifier strings to 
> resolve internal pipeline dependencies, it schedules downstream materialized 
> views/tables to run *before* their upstream dependencies, causing the 
> pipeline execution order to break.
>  
> *Steps to Reproduce*
> Consider the following pipeline script where Silver tables depend on Bronze 
> raw tables:
> {code:java}
> from pyspark import pipelines as dp
> from pyspark.sql import DataFrame, SparkSession
> from pyspark.sql.functions import col
> spark = SparkSession.active()
> BUCKET = "spark-poc-ingest-teste"
> # 1. BRONZE LAYER (Raw data)
> @dp.materialized_view
> def user_data() -> DataFrame:
>     return 
> spark.read.format("parquet").load(f"s3a://{BUCKET}/teodoro/mock_data_materialized_view")
> @dp.table
> def exchange() -> DataFrame:
>     return 
> spark.readStream.format("parquet").load(f"s3a://{BUCKET}/teodoro/exchange_data")
> # 2. SILVER LAYER (Fails to infer that it depends on the Bronze layer above)
> @dp.materialized_view(name="teodoro_silver.user_data")
> def silver_user_data():
>     # Reading via spark.table hides the lineage from the dp engine
>     return spark.table("teodoro_raw.user_data")
> @dp.table(name="teodoro_silver.exchange")
> def silver_exchange():
>     # Reading via spark.readStream.table hides the lineage from the dp engine
>     return spark.readStream.table("teodoro_raw.exchange").withColumn("time", 
> col("time").cast("timestamp")){code}
> *Expected Behavior*
> The Declarative Pipeline engine should parse or intercept `spark.table()` / 
> `spark.readStream.table()` calls within the `@dp` decorators, or provide a 
> mechanism to explicitly resolve dependencies, ensuring that the Bronze tables 
> (`user_data`, `exchange`) are executed and populated *before* the Silver 
> tables start running.
>  
> *Actual Behavior*
> The pipeline starts executing Silver tasks first (e.g., 
> `teodoro_silver.user_data`), leading to execution failures because the 
> underlying raw tables/views have not been initialized or updated yet by the 
> framework.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to