Tomas Sedlak created SPARK-57539:
------------------------------------
Summary: MERGE INTO over Iceberg temp-view source can fail with
assertion failed: No plan for TableReference[...]
Key: SPARK-57539
URL: https://issues.apache.org/jira/browse/SPARK-57539
Project: Spark
Issue Type: Bug
Components: Bug, SQL
Affects Versions: 4.1.2
Environment: OS: openSUSE Tumbleweed x86_64
Kernel: Linux 6.18.35-1-longterm
openjdk version "21.0.10" 2026-01-20 LTS
OpenJDK Runtime Environment Temurin-21.0.10+7 (build 21.0.10+7-LTS)
OpenJDK 64-Bit Server VM Temurin-21.0.10+7 (build 21.0.10+7-LTS, mixed mode,
sharing)
Reporter: Tomas Sedlak
h3. Spark version
* 4.1.2
h3. Iceberg version
* 1.11.0
* Spark runtime package:
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0
h3. Repro
Self-contained repro:
from __future__ import annotationsimport osimport shutilimport tempfilefrom
datetime import date, datetimefrom pathlib import Pathfrom pyspark.sql import
SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql import types
as Tdef main() -> None: warehouse_dir =
Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-")) ivy_dir =
Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost") spark = (
SparkSession.builder.master("local[2]")
.appName("repro-no-plan-for-tablereference")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
.config("spark.jars.ivy", str(ivy_dir))
.config("spark.sql.catalog.iceberg",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.defaultCatalog", "iceberg")
.config("spark.sql.catalogImplementation", "in-memory")
.config("spark.sql.catalog.iceberg.type", "hadoop")
.config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
) table_name = "iceberg.temporal.no_plan_for_tablereference"row_schema =
T.StructType(
[ T.StructField("asset_id", T.StringType(), False),
T.StructField("state_valid_from", T.DateType(), False),
T.StructField("state_valid_to", T.DateType(), False),
T.StructField("customer_name", T.StringType(), True),
T.StructField("state_hash", T.StringType(), False),
T.StructField("generated_at", T.TimestampType(), False),
]
) try: spark.sql("CREATE NAMESPACE IF NOT EXISTS
iceberg.temporal") spark.sql( f""" CREATE TABLE
{table_name} ( asset_id STRING NOT NULL,
state_valid_from DATE NOT NULL, state_valid_to DATE NOT NULL,
customer_name STRING, state_hash STRING NOT NULL,
generated_at TIMESTAMP NOT NULL ) USING
iceberg TBLPROPERTIES ('format-version' = '3') """
) spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep",
"hash-keep", datetime(2024, 2, 1)),
("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me",
"hash-delete", datetime(2024, 2, 11)),
], schema=row_schema,
).writeTo(table_name).append() replacement_state_df =
spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated",
"hash-updated", datetime(2030, 1, 1)),
], schema=row_schema,
) join_columns = ["asset_id", "state_valid_from",
"state_valid_to"] target_columns = row_schema.fieldNames()
merge_source_df = ( spark.table(table_name)
.alias("target")
.join(replacement_state_df.alias("source"), on=join_columns,
how="full_outer")
.select( F.coalesce(F.col("source.asset_id"),
F.col("target.asset_id")).alias("asset_id"),
F.coalesce(F.col("source.state_valid_from"),
F.col("target.state_valid_from")).alias( "state_valid_from"
), F.coalesce(F.col("source.state_valid_to"),
F.col("target.state_valid_to")).alias("state_valid_to"),
*[F.col(f"source.{column}").alias(column) for column in target_columns if
column not in join_columns],
F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
.when(F.col("target.asset_id").isNull(), F.lit("insert"))
.when(F.col("source.state_hash") == F.col("target.state_hash"),
F.lit("noop"))
.otherwise(F.lit("update"))
.alias("_merge_operation"),
)
) merge_source_df.createOrReplaceTempView("merge_source")
spark.sql( f""" MERGE INTO {table_name} target
USING merge_source source ON target.asset_id = source.asset_id
AND target.state_valid_from = source.state_valid_from AND
target.state_valid_to = source.state_valid_to WHEN MATCHED AND
source._merge_operation = 'delete' THEN DELETE WHEN MATCHED AND
source._merge_operation = 'update' THEN UPDATE SET
target.asset_id = source.asset_id,
target.state_valid_from = source.state_valid_from,
target.state_valid_to = source.state_valid_to,
target.customer_name = source.customer_name,
target.state_hash = source.state_hash, target.generated_at =
source.generated_at WHEN NOT MATCHED AND source._merge_operation =
'insert' THEN INSERT (asset_id, state_valid_from,
state_valid_to, customer_name, state_hash, generated_at) VALUES
( source.asset_id,
source.state_valid_from, source.state_valid_to,
source.customer_name, source.state_hash,
source.generated_at ) """ ) finally:
spark.sql(f"DROP TABLE IF EXISTS {table_name}") spark.stop()
shutil.rmtree(warehouse_dir, ignore_errors=True) shutil.rmtree(ivy_dir,
ignore_errors=True)if __name__ == "__main__": main()
h3. Expected
{{MERGE INTO}} should complete successfully.
It should not fail with an internal planner assertion.
h3. Actual
Spark fails during planning with an internal error:
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning
failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for
TableReference[asset_id#..., state_valid_from#..., state_valid_to#...,
customer_name#..., state_hash#..., generated_at#...]
iceberg.temporal.no_plan_for_tablereference
...
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167){{}}
h3. Workaround:
Downgrade to spark 4.0.3 solved the issue for me.
h3. Notes
* The failure reproduces consistently with local Spark 4.1.2
* The reduced shape seems to be:
** MERGE INTO Iceberg table
** USING temp view
** temp view lineage reads from the same target Iceberg table
** planner later trips on TableReference[...]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]