[
https://issues.apache.org/jira/browse/SPARK-57539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tomas Sedlak updated SPARK-57539:
---------------------------------
Description:
h3. Spark version
* 4.1.2
h3. Iceberg version
* 1.11.0
* Spark runtime package:
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0
h3. Repro
{code:java}
from __future__ import annotations
import os
import shutil
import tempfile
from datetime import date, datetime
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
def main() -> None:
warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost") spark = (
SparkSession.builder.master("local[2]")
.appName("repro-no-plan-for-tablereference")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
.config("spark.jars.ivy", str(ivy_dir))
.config("spark.sql.catalog.iceberg",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.defaultCatalog", "iceberg")
.config("spark.sql.catalogImplementation", "in-memory")
.config("spark.sql.catalog.iceberg.type", "hadoop")
.config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
) table_name = "iceberg.temporal.no_plan_for_tablereference"
row_schema = T.StructType(
[
T.StructField("asset_id", T.StringType(), False),
T.StructField("state_valid_from", T.DateType(), False),
T.StructField("state_valid_to", T.DateType(), False),
T.StructField("customer_name", T.StringType(), True),
T.StructField("state_hash", T.StringType(), False),
T.StructField("generated_at", T.TimestampType(), False),
]
) try:
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
spark.sql(
f"""
CREATE TABLE {table_name} (
asset_id STRING NOT NULL,
state_valid_from DATE NOT NULL,
state_valid_to DATE NOT NULL,
customer_name STRING,
state_hash STRING NOT NULL,
generated_at TIMESTAMP NOT NULL
)
USING iceberg
TBLPROPERTIES ('format-version' = '3')
"""
) spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep",
"hash-keep", datetime(2024, 2, 1)),
("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me",
"hash-delete", datetime(2024, 2, 11)),
],
schema=row_schema,
).writeTo(table_name).append() replacement_state_df =
spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated",
"hash-updated", datetime(2030, 1, 1)),
],
schema=row_schema,
) join_columns = ["asset_id", "state_valid_from",
"state_valid_to"]
target_columns = row_schema.fieldNames()
merge_source_df = (
spark.table(table_name)
.alias("target")
.join(replacement_state_df.alias("source"), on=join_columns,
how="full_outer")
.select(
F.coalesce(F.col("source.asset_id"),
F.col("target.asset_id")).alias("asset_id"),
F.coalesce(F.col("source.state_valid_from"),
F.col("target.state_valid_from")).alias(
"state_valid_from"
),
F.coalesce(F.col("source.state_valid_to"),
F.col("target.state_valid_to")).alias("state_valid_to"),
*[F.col(f"source.{column}").alias(column) for column in
target_columns if column not in join_columns],
F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
.when(F.col("target.asset_id").isNull(), F.lit("insert"))
.when(F.col("source.state_hash") == F.col("target.state_hash"),
F.lit("noop"))
.otherwise(F.lit("update"))
.alias("_merge_operation"),
)
)
merge_source_df.createOrReplaceTempView("merge_source")
spark.sql(
f"""
MERGE INTO {table_name} target
USING merge_source source
ON target.asset_id = source.asset_id
AND target.state_valid_from = source.state_valid_from
AND target.state_valid_to = source.state_valid_to
WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
WHEN MATCHED AND source._merge_operation = 'update' THEN
UPDATE SET
target.asset_id = source.asset_id,
target.state_valid_from = source.state_valid_from,
target.state_valid_to = source.state_valid_to,
target.customer_name = source.customer_name,
target.state_hash = source.state_hash,
target.generated_at = source.generated_at
WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
INSERT (asset_id, state_valid_from, state_valid_to,
customer_name, state_hash, generated_at)
VALUES (
source.asset_id,
source.state_valid_from,
source.state_valid_to,
source.customer_name,
source.state_hash,
source.generated_at
)
"""
)
finally:
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
spark.stop()
shutil.rmtree(warehouse_dir, ignore_errors=True)
shutil.rmtree(ivy_dir, ignore_errors=True)
if __name__ == "__main__":
main(){code}
h3. Expected
{{MERGE INTO}} should complete successfully.
It should not fail with an internal planner assertion.
h3. Actual
Spark fails during planning with an internal error:
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning
failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for
TableReference[asset_id#..., state_valid_from#..., state_valid_to#...,
customer_name#..., state_hash#..., generated_at#...|#..., state_valid_from#...,
state_valid_to#..., customer_name#..., state_hash#..., generated_at#...]
iceberg.temporal.no_plan_for_tablereference
...
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)\{{}}
h3. Workaround:
Downgrade to spark 4.0.3 solved the issue for me.
h3. Notes
* The failure reproduces consistently with local Spark 4.1.2
* The reduced shape seems to be:
** MERGE INTO Iceberg table
** USING temp view
** temp view lineage reads from the same target Iceberg table
** planner later trips on TableReference[...]
was:
h3. Spark version
* 4.1.2
h3. Iceberg version
* 1.11.0
* Spark runtime package:
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0
h3. Repro
{code:java}
from __future__ import annotationsimport os
import shutil
import tempfile
from datetime import date, datetime
from pathlib import Pathfrom pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
def main() -> None:
warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost") spark = (
SparkSession.builder.master("local[2]")
.appName("repro-no-plan-for-tablereference")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
.config("spark.jars.ivy", str(ivy_dir))
.config("spark.sql.catalog.iceberg",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.defaultCatalog", "iceberg")
.config("spark.sql.catalogImplementation", "in-memory")
.config("spark.sql.catalog.iceberg.type", "hadoop")
.config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
) table_name = "iceberg.temporal.no_plan_for_tablereference"
row_schema = T.StructType(
[
T.StructField("asset_id", T.StringType(), False),
T.StructField("state_valid_from", T.DateType(), False),
T.StructField("state_valid_to", T.DateType(), False),
T.StructField("customer_name", T.StringType(), True),
T.StructField("state_hash", T.StringType(), False),
T.StructField("generated_at", T.TimestampType(), False),
]
) try:
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
spark.sql(
f"""
CREATE TABLE {table_name} (
asset_id STRING NOT NULL,
state_valid_from DATE NOT NULL,
state_valid_to DATE NOT NULL,
customer_name STRING,
state_hash STRING NOT NULL,
generated_at TIMESTAMP NOT NULL
)
USING iceberg
TBLPROPERTIES ('format-version' = '3')
"""
) spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep",
"hash-keep", datetime(2024, 2, 1)),
("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me",
"hash-delete", datetime(2024, 2, 11)),
],
schema=row_schema,
).writeTo(table_name).append() replacement_state_df =
spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated",
"hash-updated", datetime(2030, 1, 1)),
],
schema=row_schema,
) join_columns = ["asset_id", "state_valid_from",
"state_valid_to"]
target_columns = row_schema.fieldNames()
merge_source_df = (
spark.table(table_name)
.alias("target")
.join(replacement_state_df.alias("source"), on=join_columns,
how="full_outer")
.select(
F.coalesce(F.col("source.asset_id"),
F.col("target.asset_id")).alias("asset_id"),
F.coalesce(F.col("source.state_valid_from"),
F.col("target.state_valid_from")).alias(
"state_valid_from"
),
F.coalesce(F.col("source.state_valid_to"),
F.col("target.state_valid_to")).alias("state_valid_to"),
*[F.col(f"source.{column}").alias(column) for column in
target_columns if column not in join_columns],
F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
.when(F.col("target.asset_id").isNull(), F.lit("insert"))
.when(F.col("source.state_hash") == F.col("target.state_hash"),
F.lit("noop"))
.otherwise(F.lit("update"))
.alias("_merge_operation"),
)
)
merge_source_df.createOrReplaceTempView("merge_source")
spark.sql(
f"""
MERGE INTO {table_name} target
USING merge_source source
ON target.asset_id = source.asset_id
AND target.state_valid_from = source.state_valid_from
AND target.state_valid_to = source.state_valid_to
WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
WHEN MATCHED AND source._merge_operation = 'update' THEN
UPDATE SET
target.asset_id = source.asset_id,
target.state_valid_from = source.state_valid_from,
target.state_valid_to = source.state_valid_to,
target.customer_name = source.customer_name,
target.state_hash = source.state_hash,
target.generated_at = source.generated_at
WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
INSERT (asset_id, state_valid_from, state_valid_to,
customer_name, state_hash, generated_at)
VALUES (
source.asset_id,
source.state_valid_from,
source.state_valid_to,
source.customer_name,
source.state_hash,
source.generated_at
)
"""
)
finally:
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
spark.stop()
shutil.rmtree(warehouse_dir, ignore_errors=True)
shutil.rmtree(ivy_dir, ignore_errors=True)
if __name__ == "__main__":
main() {code}
h3. Expected
{{MERGE INTO}} should complete successfully.
It should not fail with an internal planner assertion.
h3. Actual
Spark fails during planning with an internal error:
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning
failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for
TableReference[asset_id#..., state_valid_from#..., state_valid_to#...,
customer_name#..., state_hash#..., generated_at#...|#..., state_valid_from#...,
state_valid_to#..., customer_name#..., state_hash#..., generated_at#...]
iceberg.temporal.no_plan_for_tablereference
...
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)\{{}}
h3. Workaround:
Downgrade to spark 4.0.3 solved the issue for me.
h3. Notes
* The failure reproduces consistently with local Spark 4.1.2
* The reduced shape seems to be:
** MERGE INTO Iceberg table
** USING temp view
** temp view lineage reads from the same target Iceberg table
** planner later trips on TableReference[...]
> MERGE INTO over Iceberg temp-view source can fail with assertion failed: No
> plan for TableReference[...]
> --------------------------------------------------------------------------------------------------------
>
> Key: SPARK-57539
> URL: https://issues.apache.org/jira/browse/SPARK-57539
> Project: Spark
> Issue Type: Bug
> Components: Bug, SQL
> Affects Versions: 4.1.2
> Environment: OS: openSUSE Tumbleweed x86_64
> Kernel: Linux 6.18.35-1-longterm
> openjdk version "21.0.10" 2026-01-20 LTS
> OpenJDK Runtime Environment Temurin-21.0.10+7 (build 21.0.10+7-LTS)
> OpenJDK 64-Bit Server VM Temurin-21.0.10+7 (build 21.0.10+7-LTS, mixed mode,
> sharing)
> Reporter: Tomas Sedlak
> Priority: Minor
> Attachments: repro.py
>
>
> h3. Spark version
> * 4.1.2
> h3. Iceberg version
> * 1.11.0
> * Spark runtime package:
> org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0
> h3. Repro
>
> {code:java}
> from __future__ import annotations
> import os
> import shutil
> import tempfile
> from datetime import date, datetime
> from pathlib import Path
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> def main() -> None:
> warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
> ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
> os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
> os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost") spark = (
> SparkSession.builder.master("local[2]")
> .appName("repro-no-plan-for-tablereference")
> .config("spark.jars.packages",
> "org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
> .config("spark.jars.ivy", str(ivy_dir))
> .config("spark.sql.catalog.iceberg",
> "org.apache.iceberg.spark.SparkCatalog")
> .config("spark.sql.defaultCatalog", "iceberg")
> .config("spark.sql.catalogImplementation", "in-memory")
> .config("spark.sql.catalog.iceberg.type", "hadoop")
> .config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
> .config("spark.sql.extensions",
> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
> .config("spark.sql.shuffle.partitions", "4")
> .getOrCreate()
> ) table_name = "iceberg.temporal.no_plan_for_tablereference"
> row_schema = T.StructType(
> [
> T.StructField("asset_id", T.StringType(), False),
> T.StructField("state_valid_from", T.DateType(), False),
> T.StructField("state_valid_to", T.DateType(), False),
> T.StructField("customer_name", T.StringType(), True),
> T.StructField("state_hash", T.StringType(), False),
> T.StructField("generated_at", T.TimestampType(), False),
> ]
> ) try:
> spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
> spark.sql(
> f"""
> CREATE TABLE {table_name} (
> asset_id STRING NOT NULL,
> state_valid_from DATE NOT NULL,
> state_valid_to DATE NOT NULL,
> customer_name STRING,
> state_hash STRING NOT NULL,
> generated_at TIMESTAMP NOT NULL
> )
> USING iceberg
> TBLPROPERTIES ('format-version' = '3')
> """
> ) spark.createDataFrame(
> [
> ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep",
> "hash-keep", datetime(2024, 2, 1)),
> ("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me",
> "hash-delete", datetime(2024, 2, 11)),
> ],
> schema=row_schema,
> ).writeTo(table_name).append() replacement_state_df =
> spark.createDataFrame(
> [
> ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated",
> "hash-updated", datetime(2030, 1, 1)),
> ],
> schema=row_schema,
> ) join_columns = ["asset_id", "state_valid_from",
> "state_valid_to"]
> target_columns = row_schema.fieldNames()
> merge_source_df = (
> spark.table(table_name)
> .alias("target")
> .join(replacement_state_df.alias("source"), on=join_columns,
> how="full_outer")
> .select(
> F.coalesce(F.col("source.asset_id"),
> F.col("target.asset_id")).alias("asset_id"),
> F.coalesce(F.col("source.state_valid_from"),
> F.col("target.state_valid_from")).alias(
> "state_valid_from"
> ),
> F.coalesce(F.col("source.state_valid_to"),
> F.col("target.state_valid_to")).alias("state_valid_to"),
> *[F.col(f"source.{column}").alias(column) for column in
> target_columns if column not in join_columns],
> F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
> .when(F.col("target.asset_id").isNull(), F.lit("insert"))
> .when(F.col("source.state_hash") ==
> F.col("target.state_hash"), F.lit("noop"))
> .otherwise(F.lit("update"))
> .alias("_merge_operation"),
> )
> )
> merge_source_df.createOrReplaceTempView("merge_source")
> spark.sql(
> f"""
> MERGE INTO {table_name} target
> USING merge_source source
> ON target.asset_id = source.asset_id
> AND target.state_valid_from = source.state_valid_from
> AND target.state_valid_to = source.state_valid_to
> WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
> WHEN MATCHED AND source._merge_operation = 'update' THEN
> UPDATE SET
> target.asset_id = source.asset_id,
> target.state_valid_from = source.state_valid_from,
> target.state_valid_to = source.state_valid_to,
> target.customer_name = source.customer_name,
> target.state_hash = source.state_hash,
> target.generated_at = source.generated_at
> WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
> INSERT (asset_id, state_valid_from, state_valid_to,
> customer_name, state_hash, generated_at)
> VALUES (
> source.asset_id,
> source.state_valid_from,
> source.state_valid_to,
> source.customer_name,
> source.state_hash,
> source.generated_at
> )
> """
> )
> finally:
> spark.sql(f"DROP TABLE IF EXISTS {table_name}")
> spark.stop()
> shutil.rmtree(warehouse_dir, ignore_errors=True)
> shutil.rmtree(ivy_dir, ignore_errors=True)
> if __name__ == "__main__":
> main(){code}
>
> h3. Expected
> {{MERGE INTO}} should complete successfully.
> It should not fail with an internal planner assertion.
> h3. Actual
> Spark fails during planning with an internal error:
> org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase
> planning failed with an internal error.
> ...
> Caused by: java.lang.AssertionError: assertion failed: No plan for
> TableReference[asset_id#..., state_valid_from#..., state_valid_to#...,
> customer_name#..., state_hash#..., generated_at#...|#...,
> state_valid_from#..., state_valid_to#..., customer_name#..., state_hash#...,
> generated_at#...] iceberg.temporal.no_plan_for_tablereference
> ...
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> ...
> at
> org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)\{{}}
> h3. Workaround:
> Downgrade to spark 4.0.3 solved the issue for me.
> h3. Notes
> * The failure reproduces consistently with local Spark 4.1.2
> * The reduced shape seems to be:
> ** MERGE INTO Iceberg table
> ** USING temp view
> ** temp view lineage reads from the same target Iceberg table
> ** planner later trips on TableReference[...]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]