internetcoffeephone commented on issue #14814:
URL: https://github.com/apache/iceberg/issues/14814#issuecomment-3655144973

   See below code to reproduce the issue.
   My specific Spark install is using openjdk 17 through MacOS homebrew, you'll 
probably have to tweak it to match your Spark install in order to run it.
   PySpark version: `pyspark==3.5.1`
   
   The problem seems to occur when all of the below are true:
   1. `merge-on-read` is enabled (without this, no delete files are written)
   2. Rows are deleted (same as 1.)
   3. Two tags point to the same snapshot ID
   
   Seems like there's a deduplication step missing when rewriting deletes for a 
snapshot ID that has `>1` tag pointing at it. Not sure where to go from here, 
could any contributors more experienced with this project point me to where I 
should look to implement a fix?
   
   ```python
   import os
   import datetime as dt
   import shutil
   import subprocess
   
   # Set environment variables to force local-only mode
   os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
   os.environ["SPARK_LOCAL_HOSTNAME"] = "localhost"
   os.environ["JAVA_HOME"] = 
"/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"
   
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, IntegerType, DateType
   
   # Set up warehouse path
   warehouse_path = os.path.abspath("/tmp/warehouse")
   warehouse_rewrite_path = f"{warehouse_path}_rewrite"
   table_path_full = "local.default.test_table"
   for delete_path in [warehouse_path, warehouse_rewrite_path]:
       if os.path.exists(delete_path):
           print(f"Deleting {delete_path}")
           shutil.rmtree(delete_path)
   os.makedirs(warehouse_path)
   
   # Download Iceberg JAR if needed
   jar_dir = os.path.abspath("jars")
   if not os.path.exists(jar_dir):
       os.makedirs(jar_dir)
   
   jar_path = f"{jar_dir}/iceberg-spark-runtime-3.5_2.12-1.10.0.jar"
   if not os.path.exists(jar_path):
       print("Downloading Iceberg JAR...")
       url = 
"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar";
       subprocess.run(["curl", "-L", url, "-o", jar_path], check=True)
   
   # Initialize Spark session with strict local mode
   spark = SparkSession.builder \
       .appName("IcebergExample") \
       .master("local") \
       .config("spark.driver.bindAddress", "127.0.0.1") \
       .config("spark.driver.host", "localhost") \
       .config("spark.jars", jar_path) \
       .config("spark.sql.extensions",
               
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
       .config("spark.sql.catalog.spark_catalog",
               "org.apache.iceberg.spark.SparkSessionCatalog") \
       .config("spark.sql.catalog.local",
               "org.apache.iceberg.spark.SparkCatalog") \
       .config("spark.sql.catalog.local.type",
               "hadoop") \
       .config("spark.sql.catalog.local.warehouse",
               f"file://{warehouse_path}") \
       .getOrCreate()
   
   # Create namespace
   spark.sql("CREATE NAMESPACE IF NOT EXISTS local.default")
   
   # Define schema using Spark's StructType
   schema = StructType([
       StructField("id", IntegerType(), False),
       StructField("dt", DateType(), False)
   ])
   
   # Create table with merge-on-read properties
   spark.sql(f"""
   CREATE TABLE IF NOT EXISTS local.default.test_table (
       id INT NOT NULL,
       dt DATE NOT NULL
   )
   USING iceberg
   PARTITIONED BY (month(dt))
   TBLPROPERTIES (
       'write.merge.mode' = 'merge-on-read',
       'write.delete.mode' = 'merge-on-read',
       'write.update.mode' = 'merge-on-read'
   )
   """)
   
   initial_data = [(0, dt.date(2024, 1, 1)), (1, dt.date(2024, 1, 2))]
   df = spark.createDataFrame(initial_data, schema)
   df.writeTo("local.default.test_table").append()
   
   # Keep adding data, deletes and tags
   for i in range(1, 3):
       # Create new data
       new_data = [(i, dt.date(2024, i + 1, 1)),
                   (i + 1000, dt.date(2024, i + 1, 2))]
       new_df = spark.createDataFrame(new_data, schema)
   
       # Delete rows to generate .deletes.parquet files
       spark.sql(f"DELETE FROM local.default.test_table WHERE id = {i - 1}")
   
       # Append new data
       new_df.writeTo("local.default.test_table").append()
   
       # Get current snapshot ID
       snapshot_info = spark.sql(
           "SELECT snapshot_id FROM local.default.test_table.snapshots ORDER BY 
committed_at DESC LIMIT 1")
       snapshot_id = snapshot_info.collect()[0][0]
   
       # Create two tags for a single snapshot
       for tag_index in range(2):
           tag = f"{i}_{tag_index}"
           print(f"Creating snapshot with tag {tag}")
           spark.sql(f"ALTER TABLE local.default.test_table CREATE TAG `{tag}` 
AS OF VERSION {snapshot_id}")
   
   
   # Call rewrite table path procedure
   # This will throw an exception:
   # org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:
   # 
file:/tmp/warehouse_rewrite/default/test_table/data/dt_month=2024-01/00000-3-bee412f1-8b39-45a4-9b0e-b014d8e7bdd6-00001-deletes.parquet
   query = f"""
   CALL local.system.rewrite_table_path(\ntable => 'local.default.test_table',
   source_prefix => 'file:{warehouse_path}',
   target_prefix => 'file:{warehouse_rewrite_path}',
   staging_location => 'file:{warehouse_rewrite_path}'
   )"""
   print(query)
   spark.sql(query)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to