[
https://issues.apache.org/jira/browse/SPARK-57352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18087904#comment-18087904
]
Bernardo Alemar commented on SPARK-57352:
-----------------------------------------
Please feel free to reach out to me anytime. Implementing SDP would be
game-changing for me.
> [SDP] Pipeline execution order is broken because lineage is not inferred from
> spark.table / spark.readStream.table
> ------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-57352
> URL: https://issues.apache.org/jira/browse/SPARK-57352
> Project: Spark
> Issue Type: Bug
> Components: Connect, Declarative Pipelines
> Affects Versions: 4.1.1
> Environment: - {*}Spark Version{*}: 4.1.1
> - {*}Execution Mode{*}: Spark Connect
> - {*}Deployment Platform{*}: AWS EKS (Elastic Kubernetes Service)
> - {*}Catalog Metastore{*}: AWS Glue Data Catalog
> - {*}Table Format{*}: Apache Iceberg
> - {*}Apache Iceberg Version{*}: 1.10.1
> - {*}AWS Bundle Version{*}: 1.10.1
> - {*}Spark Packages{*}:
> org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.10.1
> org.apache.iceberg:iceberg-aws-bundle:1.10.1
> Reporter: Bernardo Alemar
> Priority: Major
> Labels: pyspark, spark
>
> *Error Description*
> When building a pipeline using the new Declarative Pipelines framework
> (`pyspark.pipelines`), the execution engine fails to correctly infer
> dependencies (DAG lineage) if a downstream table (e.g., Silver layer)
> references an upstream table (e.g., Bronze layer) using `spark.table()` or
> `spark.readStream.table()`.
> Because the framework does not analyze the table identifier strings to
> resolve internal pipeline dependencies, it schedules downstream materialized
> views/tables to run *before* their upstream dependencies, causing the
> pipeline execution order to break.
>
> *Steps to Reproduce*
> Consider the following pipeline script where Silver tables depend on Bronze
> raw tables:
> {code:java}
> from pyspark import pipelines as dp
> from pyspark.sql import DataFrame, SparkSession
> from pyspark.sql.functions import col
> spark = SparkSession.active()
> BUCKET = "spark-poc-ingest-teste"
> # 1. BRONZE LAYER (Raw data)
> @dp.materialized_view
> def user_data() -> DataFrame:
> return
> spark.read.format("parquet").load(f"s3a://{BUCKET}/teodoro/mock_data_materialized_view")
> @dp.table
> def exchange() -> DataFrame:
> return
> spark.readStream.format("parquet").load(f"s3a://{BUCKET}/teodoro/exchange_data")
> # 2. SILVER LAYER (Fails to infer that it depends on the Bronze layer above)
> @dp.materialized_view(name="teodoro_silver.user_data")
> def silver_user_data():
> # Reading via spark.table hides the lineage from the dp engine
> return spark.table("teodoro_raw.user_data")
> @dp.table(name="teodoro_silver.exchange")
> def silver_exchange():
> # Reading via spark.readStream.table hides the lineage from the dp engine
> return spark.readStream.table("teodoro_raw.exchange").withColumn("time",
> col("time").cast("timestamp")){code}
> *Expected Behavior*
> The Declarative Pipeline engine should parse or intercept `spark.table()` /
> `spark.readStream.table()` calls within the `@dp` decorators, or provide a
> mechanism to explicitly resolve dependencies, ensuring that the Bronze tables
> (`user_data`, `exchange`) are executed and populated *before* the Silver
> tables start running.
>
> *Actual Behavior*
> The pipeline starts executing Silver tasks first (e.g.,
> `teodoro_silver.user_data`), leading to execution failures because the
> underlying raw tables/views have not been initialized or updated yet by the
> framework.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]