Tomas Sedlak created SPARK-57539:
------------------------------------

             Summary: MERGE INTO over Iceberg temp-view source can fail with 
assertion failed: No plan for TableReference[...]
                 Key: SPARK-57539
                 URL: https://issues.apache.org/jira/browse/SPARK-57539
             Project: Spark
          Issue Type: Bug
          Components: Bug, SQL
    Affects Versions: 4.1.2
         Environment: OS: openSUSE Tumbleweed x86_64
Kernel: Linux 6.18.35-1-longterm
openjdk version "21.0.10" 2026-01-20 LTS
OpenJDK Runtime Environment Temurin-21.0.10+7 (build 21.0.10+7-LTS)
OpenJDK 64-Bit Server VM Temurin-21.0.10+7 (build 21.0.10+7-LTS, mixed mode, 
sharing)
            Reporter: Tomas Sedlak


h3. Spark version
 * 4.1.2

h3. Iceberg version
 * 1.11.0
 * Spark runtime package: 
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0

h3. Repro

Self-contained repro:
from __future__ import annotationsimport osimport shutilimport tempfilefrom 
datetime import date, datetimefrom pathlib import Pathfrom pyspark.sql import 
SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql import types 
as Tdef main() -> None:    warehouse_dir = 
Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))    ivy_dir = 
Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))    
os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")    
os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")    spark = (        
SparkSession.builder.master("local[2]")
        .appName("repro-no-plan-for-tablereference")
        .config("spark.jars.packages", 
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
        .config("spark.jars.ivy", str(ivy_dir))
        .config("spark.sql.catalog.iceberg", 
"org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.defaultCatalog", "iceberg")
        .config("spark.sql.catalogImplementation", "in-memory")
        .config("spark.sql.catalog.iceberg.type", "hadoop")
        .config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
        .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )    table_name = "iceberg.temporal.no_plan_for_tablereference"row_schema = 
T.StructType(
        [            T.StructField("asset_id", T.StringType(), False),          
  T.StructField("state_valid_from", T.DateType(), False),            
T.StructField("state_valid_to", T.DateType(), False),            
T.StructField("customer_name", T.StringType(), True),            
T.StructField("state_hash", T.StringType(), False),            
T.StructField("generated_at", T.TimestampType(), False),
        ]
    )    try:        spark.sql("CREATE NAMESPACE IF NOT EXISTS 
iceberg.temporal")        spark.sql(            f"""            CREATE TABLE 
{table_name} (                asset_id STRING NOT NULL,                
state_valid_from DATE NOT NULL,                state_valid_to DATE NOT NULL,    
            customer_name STRING,                state_hash STRING NOT NULL,    
            generated_at TIMESTAMP NOT NULL            )            USING 
iceberg            TBLPROPERTIES ('format-version' = '3')            """        
)        spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep", 
"hash-keep", datetime(2024, 2, 1)),
                ("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me", 
"hash-delete", datetime(2024, 2, 11)),
            ],            schema=row_schema,
        ).writeTo(table_name).append()        replacement_state_df = 
spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated", 
"hash-updated", datetime(2030, 1, 1)),
            ],            schema=row_schema,
        )        join_columns = ["asset_id", "state_valid_from", 
"state_valid_to"]        target_columns = row_schema.fieldNames()        
merge_source_df = (            spark.table(table_name)
            .alias("target")
            .join(replacement_state_df.alias("source"), on=join_columns, 
how="full_outer")
            .select(                F.coalesce(F.col("source.asset_id"), 
F.col("target.asset_id")).alias("asset_id"),                
F.coalesce(F.col("source.state_valid_from"), 
F.col("target.state_valid_from")).alias(                    "state_valid_from"  
              ),                F.coalesce(F.col("source.state_valid_to"), 
F.col("target.state_valid_to")).alias("state_valid_to"),                
*[F.col(f"source.{column}").alias(column) for column in target_columns if 
column not in join_columns],                
F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
                .when(F.col("target.asset_id").isNull(), F.lit("insert"))
                .when(F.col("source.state_hash") == F.col("target.state_hash"), 
F.lit("noop"))
                .otherwise(F.lit("update"))
                .alias("_merge_operation"),
            )
        )        merge_source_df.createOrReplaceTempView("merge_source")        
spark.sql(            f"""            MERGE INTO {table_name} target            
USING merge_source source            ON target.asset_id = source.asset_id       
        AND target.state_valid_from = source.state_valid_from               AND 
target.state_valid_to = source.state_valid_to            WHEN MATCHED AND 
source._merge_operation = 'delete' THEN DELETE            WHEN MATCHED AND 
source._merge_operation = 'update' THEN                UPDATE SET               
     target.asset_id = source.asset_id,                    
target.state_valid_from = source.state_valid_from,                    
target.state_valid_to = source.state_valid_to,                    
target.customer_name = source.customer_name,                    
target.state_hash = source.state_hash,                    target.generated_at = 
source.generated_at            WHEN NOT MATCHED AND source._merge_operation = 
'insert' THEN                INSERT (asset_id, state_valid_from, 
state_valid_to, customer_name, state_hash, generated_at)                VALUES 
(                    source.asset_id,                    
source.state_valid_from,                    source.state_valid_to,              
      source.customer_name,                    source.state_hash,               
     source.generated_at                )            """        )    finally:   
     spark.sql(f"DROP TABLE IF EXISTS {table_name}")        spark.stop()        
shutil.rmtree(warehouse_dir, ignore_errors=True)        shutil.rmtree(ivy_dir, 
ignore_errors=True)if __name__ == "__main__":    main()
 
h3. Expected

{{MERGE INTO}} should complete successfully.

It should not fail with an internal planner assertion.
h3. Actual

Spark fails during planning with an internal error:
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning 
failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for 
TableReference[asset_id#..., state_valid_from#..., state_valid_to#..., 
customer_name#..., state_hash#..., generated_at#...] 
iceberg.temporal.no_plan_for_tablereference
...
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167){{}}
h3. Workaround:

Downgrade to spark 4.0.3 solved the issue for me.
h3. Notes
 * The failure reproduces consistently with local Spark 4.1.2
 * The reduced shape seems to be:
 ** MERGE INTO Iceberg table
 ** USING temp view
 ** temp view lineage reads from the same target Iceberg table
 ** planner later trips on TableReference[...]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to