internetcoffeephone commented on issue #14814:
URL: https://github.com/apache/iceberg/issues/14814#issuecomment-3655144973
See below code to reproduce the issue.
My specific Spark install is using openjdk 17 through MacOS homebrew, you'll
probably have to tweak it to match your Spark install in order to run it.
PySpark version: `pyspark==3.5.1`
The problem seems to occur when all of the below are true:
1. `merge-on-read` is enabled (without this, no delete files are written)
2. Rows are deleted (same as 1.)
3. Two tags point to the same snapshot ID
Seems like there's a deduplication step missing when rewriting deletes for a
snapshot ID that has `>1` tag pointing at it. Not sure where to go from here,
could any contributors more experienced with this project point me to where I
should look to implement a fix?
```python
import os
import datetime as dt
import shutil
import subprocess
# Set environment variables to force local-only mode
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["SPARK_LOCAL_HOSTNAME"] = "localhost"
os.environ["JAVA_HOME"] =
"/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType
# Set up warehouse path
warehouse_path = os.path.abspath("/tmp/warehouse")
warehouse_rewrite_path = f"{warehouse_path}_rewrite"
table_path_full = "local.default.test_table"
for delete_path in [warehouse_path, warehouse_rewrite_path]:
if os.path.exists(delete_path):
print(f"Deleting {delete_path}")
shutil.rmtree(delete_path)
os.makedirs(warehouse_path)
# Download Iceberg JAR if needed
jar_dir = os.path.abspath("jars")
if not os.path.exists(jar_dir):
os.makedirs(jar_dir)
jar_path = f"{jar_dir}/iceberg-spark-runtime-3.5_2.12-1.10.0.jar"
if not os.path.exists(jar_path):
print("Downloading Iceberg JAR...")
url =
"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar"
subprocess.run(["curl", "-L", url, "-o", jar_path], check=True)
# Initialize Spark session with strict local mode
spark = SparkSession.builder \
.appName("IcebergExample") \
.master("local") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config("spark.driver.host", "localhost") \
.config("spark.jars", jar_path) \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.local",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type",
"hadoop") \
.config("spark.sql.catalog.local.warehouse",
f"file://{warehouse_path}") \
.getOrCreate()
# Create namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.default")
# Define schema using Spark's StructType
schema = StructType([
StructField("id", IntegerType(), False),
StructField("dt", DateType(), False)
])
# Create table with merge-on-read properties
spark.sql(f"""
CREATE TABLE IF NOT EXISTS local.default.test_table (
id INT NOT NULL,
dt DATE NOT NULL
)
USING iceberg
PARTITIONED BY (month(dt))
TBLPROPERTIES (
'write.merge.mode' = 'merge-on-read',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read'
)
""")
initial_data = [(0, dt.date(2024, 1, 1)), (1, dt.date(2024, 1, 2))]
df = spark.createDataFrame(initial_data, schema)
df.writeTo("local.default.test_table").append()
# Keep adding data, deletes and tags
for i in range(1, 3):
# Create new data
new_data = [(i, dt.date(2024, i + 1, 1)),
(i + 1000, dt.date(2024, i + 1, 2))]
new_df = spark.createDataFrame(new_data, schema)
# Delete rows to generate .deletes.parquet files
spark.sql(f"DELETE FROM local.default.test_table WHERE id = {i - 1}")
# Append new data
new_df.writeTo("local.default.test_table").append()
# Get current snapshot ID
snapshot_info = spark.sql(
"SELECT snapshot_id FROM local.default.test_table.snapshots ORDER BY
committed_at DESC LIMIT 1")
snapshot_id = snapshot_info.collect()[0][0]
# Create two tags for a single snapshot
for tag_index in range(2):
tag = f"{i}_{tag_index}"
print(f"Creating snapshot with tag {tag}")
spark.sql(f"ALTER TABLE local.default.test_table CREATE TAG `{tag}`
AS OF VERSION {snapshot_id}")
# Call rewrite table path procedure
# This will throw an exception:
# org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:
#
file:/tmp/warehouse_rewrite/default/test_table/data/dt_month=2024-01/00000-3-bee412f1-8b39-45a4-9b0e-b014d8e7bdd6-00001-deletes.parquet
query = f"""
CALL local.system.rewrite_table_path(\ntable => 'local.default.test_table',
source_prefix => 'file:{warehouse_path}',
target_prefix => 'file:{warehouse_rewrite_path}',
staging_location => 'file:{warehouse_rewrite_path}'
)"""
print(query)
spark.sql(query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]