[ 
https://issues.apache.org/jira/browse/SPARK-57539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomas Sedlak updated SPARK-57539:
---------------------------------
    Description: 
h3. Spark version
 * 4.1.2

h3. Iceberg version
 * 1.11.0
 * Spark runtime package: 
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0

h3. Repro

 
{code:java}
from __future__ import annotations

import os
import shutil
import tempfile

from datetime import date, datetime
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T


def main() -> None:
    warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
    ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
    os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
    os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")    spark = (
        SparkSession.builder.master("local[2]")
        .appName("repro-no-plan-for-tablereference")
        .config("spark.jars.packages", 
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
        .config("spark.jars.ivy", str(ivy_dir))
        .config("spark.sql.catalog.iceberg", 
"org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.defaultCatalog", "iceberg")
        .config("spark.sql.catalogImplementation", "in-memory")
        .config("spark.sql.catalog.iceberg.type", "hadoop")
        .config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
        .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )    table_name = "iceberg.temporal.no_plan_for_tablereference"
    row_schema = T.StructType(
        [
            T.StructField("asset_id", T.StringType(), False),
            T.StructField("state_valid_from", T.DateType(), False),
            T.StructField("state_valid_to", T.DateType(), False),
            T.StructField("customer_name", T.StringType(), True),
            T.StructField("state_hash", T.StringType(), False),
            T.StructField("generated_at", T.TimestampType(), False),
        ]
    )    try:
        spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
        spark.sql(
            f"""
            CREATE TABLE {table_name} (
                asset_id STRING NOT NULL,
                state_valid_from DATE NOT NULL,
                state_valid_to DATE NOT NULL,
                customer_name STRING,
                state_hash STRING NOT NULL,
                generated_at TIMESTAMP NOT NULL
            )
            USING iceberg
            TBLPROPERTIES ('format-version' = '3')
            """
        )        spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep", 
"hash-keep", datetime(2024, 2, 1)),
                ("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me", 
"hash-delete", datetime(2024, 2, 11)),
            ],
            schema=row_schema,
        ).writeTo(table_name).append()        replacement_state_df = 
spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated", 
"hash-updated", datetime(2030, 1, 1)),
            ],
            schema=row_schema,
        )        join_columns = ["asset_id", "state_valid_from", 
"state_valid_to"]
        target_columns = row_schema.fieldNames()
        merge_source_df = (
            spark.table(table_name)
            .alias("target")
            .join(replacement_state_df.alias("source"), on=join_columns, 
how="full_outer")
            .select(
                F.coalesce(F.col("source.asset_id"), 
F.col("target.asset_id")).alias("asset_id"),
                F.coalesce(F.col("source.state_valid_from"), 
F.col("target.state_valid_from")).alias(
                    "state_valid_from"
                ),
                F.coalesce(F.col("source.state_valid_to"), 
F.col("target.state_valid_to")).alias("state_valid_to"),
                *[F.col(f"source.{column}").alias(column) for column in 
target_columns if column not in join_columns],
                F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
                .when(F.col("target.asset_id").isNull(), F.lit("insert"))
                .when(F.col("source.state_hash") == F.col("target.state_hash"), 
F.lit("noop"))
                .otherwise(F.lit("update"))
                .alias("_merge_operation"),
            )
        )
        merge_source_df.createOrReplaceTempView("merge_source")        
spark.sql(
            f"""
            MERGE INTO {table_name} target
            USING merge_source source
            ON target.asset_id = source.asset_id
               AND target.state_valid_from = source.state_valid_from
               AND target.state_valid_to = source.state_valid_to
            WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
            WHEN MATCHED AND source._merge_operation = 'update' THEN
                UPDATE SET
                    target.asset_id = source.asset_id,
                    target.state_valid_from = source.state_valid_from,
                    target.state_valid_to = source.state_valid_to,
                    target.customer_name = source.customer_name,
                    target.state_hash = source.state_hash,
                    target.generated_at = source.generated_at
            WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
                INSERT (asset_id, state_valid_from, state_valid_to, 
customer_name, state_hash, generated_at)
                VALUES (
                    source.asset_id,
                    source.state_valid_from,
                    source.state_valid_to,
                    source.customer_name,
                    source.state_hash,
                    source.generated_at
                )
            """
        )
    finally:
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")
        spark.stop()
        shutil.rmtree(warehouse_dir, ignore_errors=True)
        shutil.rmtree(ivy_dir, ignore_errors=True)
if __name__ == "__main__":
    main(){code}
 
h3. Expected

{{MERGE INTO}} should complete successfully.

It should not fail with an internal planner assertion.
h3. Actual

Spark fails during planning with an internal error:
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning 
failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for 
TableReference[asset_id#..., state_valid_from#..., state_valid_to#..., 
customer_name#..., state_hash#..., generated_at#...|#..., state_valid_from#..., 
state_valid_to#..., customer_name#..., state_hash#..., generated_at#...] 
iceberg.temporal.no_plan_for_tablereference
...
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)\{{}}
h3. Workaround:

Downgrade to spark 4.0.3 solved the issue for me.
h3. Notes
 * The failure reproduces consistently with local Spark 4.1.2
 * The reduced shape seems to be:
 ** MERGE INTO Iceberg table
 ** USING temp view
 ** temp view lineage reads from the same target Iceberg table
 ** planner later trips on TableReference[...]
 

  was:
h3. Spark version
 * 4.1.2

h3. Iceberg version
 * 1.11.0
 * Spark runtime package: 
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0

h3. Repro

 
{code:java}
from __future__ import annotationsimport os
import shutil
import tempfile
from datetime import date, datetime
from pathlib import Pathfrom pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
def main() -> None:
    warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
    ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
    os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
    os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")    spark = (
        SparkSession.builder.master("local[2]")
        .appName("repro-no-plan-for-tablereference")
        .config("spark.jars.packages", 
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
        .config("spark.jars.ivy", str(ivy_dir))
        .config("spark.sql.catalog.iceberg", 
"org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.defaultCatalog", "iceberg")
        .config("spark.sql.catalogImplementation", "in-memory")
        .config("spark.sql.catalog.iceberg.type", "hadoop")
        .config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
        .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )    table_name = "iceberg.temporal.no_plan_for_tablereference"
    row_schema = T.StructType(
        [
            T.StructField("asset_id", T.StringType(), False),
            T.StructField("state_valid_from", T.DateType(), False),
            T.StructField("state_valid_to", T.DateType(), False),
            T.StructField("customer_name", T.StringType(), True),
            T.StructField("state_hash", T.StringType(), False),
            T.StructField("generated_at", T.TimestampType(), False),
        ]
    )    try:
        spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
        spark.sql(
            f"""
            CREATE TABLE {table_name} (
                asset_id STRING NOT NULL,
                state_valid_from DATE NOT NULL,
                state_valid_to DATE NOT NULL,
                customer_name STRING,
                state_hash STRING NOT NULL,
                generated_at TIMESTAMP NOT NULL
            )
            USING iceberg
            TBLPROPERTIES ('format-version' = '3')
            """
        )        spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep", 
"hash-keep", datetime(2024, 2, 1)),
                ("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me", 
"hash-delete", datetime(2024, 2, 11)),
            ],
            schema=row_schema,
        ).writeTo(table_name).append()        replacement_state_df = 
spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated", 
"hash-updated", datetime(2030, 1, 1)),
            ],
            schema=row_schema,
        )        join_columns = ["asset_id", "state_valid_from", 
"state_valid_to"]
        target_columns = row_schema.fieldNames()
        merge_source_df = (
            spark.table(table_name)
            .alias("target")
            .join(replacement_state_df.alias("source"), on=join_columns, 
how="full_outer")
            .select(
                F.coalesce(F.col("source.asset_id"), 
F.col("target.asset_id")).alias("asset_id"),
                F.coalesce(F.col("source.state_valid_from"), 
F.col("target.state_valid_from")).alias(
                    "state_valid_from"
                ),
                F.coalesce(F.col("source.state_valid_to"), 
F.col("target.state_valid_to")).alias("state_valid_to"),
                *[F.col(f"source.{column}").alias(column) for column in 
target_columns if column not in join_columns],
                F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
                .when(F.col("target.asset_id").isNull(), F.lit("insert"))
                .when(F.col("source.state_hash") == F.col("target.state_hash"), 
F.lit("noop"))
                .otherwise(F.lit("update"))
                .alias("_merge_operation"),
            )
        )
        merge_source_df.createOrReplaceTempView("merge_source")        
spark.sql(
            f"""
            MERGE INTO {table_name} target
            USING merge_source source
            ON target.asset_id = source.asset_id
               AND target.state_valid_from = source.state_valid_from
               AND target.state_valid_to = source.state_valid_to
            WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
            WHEN MATCHED AND source._merge_operation = 'update' THEN
                UPDATE SET
                    target.asset_id = source.asset_id,
                    target.state_valid_from = source.state_valid_from,
                    target.state_valid_to = source.state_valid_to,
                    target.customer_name = source.customer_name,
                    target.state_hash = source.state_hash,
                    target.generated_at = source.generated_at
            WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
                INSERT (asset_id, state_valid_from, state_valid_to, 
customer_name, state_hash, generated_at)
                VALUES (
                    source.asset_id,
                    source.state_valid_from,
                    source.state_valid_to,
                    source.customer_name,
                    source.state_hash,
                    source.generated_at
                )
            """
        )
    finally:
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")
        spark.stop()
        shutil.rmtree(warehouse_dir, ignore_errors=True)
        shutil.rmtree(ivy_dir, ignore_errors=True)
if __name__ == "__main__":
    main() {code}
 
h3. Expected

{{MERGE INTO}} should complete successfully.

It should not fail with an internal planner assertion.
h3. Actual

Spark fails during planning with an internal error:
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning 
failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for 
TableReference[asset_id#..., state_valid_from#..., state_valid_to#..., 
customer_name#..., state_hash#..., generated_at#...|#..., state_valid_from#..., 
state_valid_to#..., customer_name#..., state_hash#..., generated_at#...] 
iceberg.temporal.no_plan_for_tablereference
...
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)\{{}}
h3. Workaround:

Downgrade to spark 4.0.3 solved the issue for me.
h3. Notes
 * The failure reproduces consistently with local Spark 4.1.2
 * The reduced shape seems to be:
 ** MERGE INTO Iceberg table
 ** USING temp view
 ** temp view lineage reads from the same target Iceberg table
 ** planner later trips on TableReference[...]
 


> MERGE INTO over Iceberg temp-view source can fail with assertion failed: No 
> plan for TableReference[...]
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-57539
>                 URL: https://issues.apache.org/jira/browse/SPARK-57539
>             Project: Spark
>          Issue Type: Bug
>          Components: Bug, SQL
>    Affects Versions: 4.1.2
>         Environment: OS: openSUSE Tumbleweed x86_64
> Kernel: Linux 6.18.35-1-longterm
> openjdk version "21.0.10" 2026-01-20 LTS
> OpenJDK Runtime Environment Temurin-21.0.10+7 (build 21.0.10+7-LTS)
> OpenJDK 64-Bit Server VM Temurin-21.0.10+7 (build 21.0.10+7-LTS, mixed mode, 
> sharing)
>            Reporter: Tomas Sedlak
>            Priority: Minor
>         Attachments: repro.py
>
>
> h3. Spark version
>  * 4.1.2
> h3. Iceberg version
>  * 1.11.0
>  * Spark runtime package: 
> org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0
> h3. Repro
>  
> {code:java}
> from __future__ import annotations
> import os
> import shutil
> import tempfile
> from datetime import date, datetime
> from pathlib import Path
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> def main() -> None:
>     warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
>     ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
>     os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
>     os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")    spark = (
>         SparkSession.builder.master("local[2]")
>         .appName("repro-no-plan-for-tablereference")
>         .config("spark.jars.packages", 
> "org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
>         .config("spark.jars.ivy", str(ivy_dir))
>         .config("spark.sql.catalog.iceberg", 
> "org.apache.iceberg.spark.SparkCatalog")
>         .config("spark.sql.defaultCatalog", "iceberg")
>         .config("spark.sql.catalogImplementation", "in-memory")
>         .config("spark.sql.catalog.iceberg.type", "hadoop")
>         .config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
>         .config("spark.sql.extensions", 
> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
>         .config("spark.sql.shuffle.partitions", "4")
>         .getOrCreate()
>     )    table_name = "iceberg.temporal.no_plan_for_tablereference"
>     row_schema = T.StructType(
>         [
>             T.StructField("asset_id", T.StringType(), False),
>             T.StructField("state_valid_from", T.DateType(), False),
>             T.StructField("state_valid_to", T.DateType(), False),
>             T.StructField("customer_name", T.StringType(), True),
>             T.StructField("state_hash", T.StringType(), False),
>             T.StructField("generated_at", T.TimestampType(), False),
>         ]
>     )    try:
>         spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
>         spark.sql(
>             f"""
>             CREATE TABLE {table_name} (
>                 asset_id STRING NOT NULL,
>                 state_valid_from DATE NOT NULL,
>                 state_valid_to DATE NOT NULL,
>                 customer_name STRING,
>                 state_hash STRING NOT NULL,
>                 generated_at TIMESTAMP NOT NULL
>             )
>             USING iceberg
>             TBLPROPERTIES ('format-version' = '3')
>             """
>         )        spark.createDataFrame(
>             [
>                 ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep", 
> "hash-keep", datetime(2024, 2, 1)),
>                 ("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me", 
> "hash-delete", datetime(2024, 2, 11)),
>             ],
>             schema=row_schema,
>         ).writeTo(table_name).append()        replacement_state_df = 
> spark.createDataFrame(
>             [
>                 ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated", 
> "hash-updated", datetime(2030, 1, 1)),
>             ],
>             schema=row_schema,
>         )        join_columns = ["asset_id", "state_valid_from", 
> "state_valid_to"]
>         target_columns = row_schema.fieldNames()
>         merge_source_df = (
>             spark.table(table_name)
>             .alias("target")
>             .join(replacement_state_df.alias("source"), on=join_columns, 
> how="full_outer")
>             .select(
>                 F.coalesce(F.col("source.asset_id"), 
> F.col("target.asset_id")).alias("asset_id"),
>                 F.coalesce(F.col("source.state_valid_from"), 
> F.col("target.state_valid_from")).alias(
>                     "state_valid_from"
>                 ),
>                 F.coalesce(F.col("source.state_valid_to"), 
> F.col("target.state_valid_to")).alias("state_valid_to"),
>                 *[F.col(f"source.{column}").alias(column) for column in 
> target_columns if column not in join_columns],
>                 F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
>                 .when(F.col("target.asset_id").isNull(), F.lit("insert"))
>                 .when(F.col("source.state_hash") == 
> F.col("target.state_hash"), F.lit("noop"))
>                 .otherwise(F.lit("update"))
>                 .alias("_merge_operation"),
>             )
>         )
>         merge_source_df.createOrReplaceTempView("merge_source")        
> spark.sql(
>             f"""
>             MERGE INTO {table_name} target
>             USING merge_source source
>             ON target.asset_id = source.asset_id
>                AND target.state_valid_from = source.state_valid_from
>                AND target.state_valid_to = source.state_valid_to
>             WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
>             WHEN MATCHED AND source._merge_operation = 'update' THEN
>                 UPDATE SET
>                     target.asset_id = source.asset_id,
>                     target.state_valid_from = source.state_valid_from,
>                     target.state_valid_to = source.state_valid_to,
>                     target.customer_name = source.customer_name,
>                     target.state_hash = source.state_hash,
>                     target.generated_at = source.generated_at
>             WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
>                 INSERT (asset_id, state_valid_from, state_valid_to, 
> customer_name, state_hash, generated_at)
>                 VALUES (
>                     source.asset_id,
>                     source.state_valid_from,
>                     source.state_valid_to,
>                     source.customer_name,
>                     source.state_hash,
>                     source.generated_at
>                 )
>             """
>         )
>     finally:
>         spark.sql(f"DROP TABLE IF EXISTS {table_name}")
>         spark.stop()
>         shutil.rmtree(warehouse_dir, ignore_errors=True)
>         shutil.rmtree(ivy_dir, ignore_errors=True)
> if __name__ == "__main__":
>     main(){code}
>  
> h3. Expected
> {{MERGE INTO}} should complete successfully.
> It should not fail with an internal planner assertion.
> h3. Actual
> Spark fails during planning with an internal error:
> org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase 
> planning failed with an internal error.
> ...
> Caused by: java.lang.AssertionError: assertion failed: No plan for 
> TableReference[asset_id#..., state_valid_from#..., state_valid_to#..., 
> customer_name#..., state_hash#..., generated_at#...|#..., 
> state_valid_from#..., state_valid_to#..., customer_name#..., state_hash#..., 
> generated_at#...] iceberg.temporal.no_plan_for_tablereference
> ...
> at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> ...
> at 
> org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)\{{}}
> h3. Workaround:
> Downgrade to spark 4.0.3 solved the issue for me.
> h3. Notes
>  * The failure reproduces consistently with local Spark 4.1.2
>  * The reduced shape seems to be:
>  ** MERGE INTO Iceberg table
>  ** USING temp view
>  ** temp view lineage reads from the same target Iceberg table
>  ** planner later trips on TableReference[...]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to