ammarchalifah commented on issue #15349:
URL: https://github.com/apache/iceberg/issues/15349#issuecomment-4562613303

   Lightweight script to reproduce the issue on EMR 8.0.0 (EMR on EC2)
   
   Python script `repro-spj-bucket-string.py`
   
   ```
   """
   Minimal reproducer for apache/iceberg#15349
   SPJ with bucket partition key on String column throws:
     IllegalArgumentException: Wrong class, expected java.lang.CharSequence, 
but was java.lang.Integer
   
   Run on EMR with SPJ configs enabled. See repro-spj-emr.sh for the full 
command.
   """
   
   from pyspark.sql import SparkSession
   import uuid
   
   SPJ_CONFIGS = {
       # Exact production configs from spark-submit
       "spark.sql.iceberg.planning.preserve-data-grouping": "true",
       "spark.sql.sources.v2.bucketing.enabled": "true",
       "spark.sql.sources.v2.bucketing.pushPartValues.enabled": "true",
       "spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled": 
"false",
       "spark.sql.requireAllClusterKeysForCoPartition": "false",
       
"spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled": 
"true",
       "spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled": 
"true",
       "spark.sql.sources.v2.bucketing.partition.filter.enabled": "true",
       "spark.sql.sources.v2.bucketing.shuffle.enabled": "true",
       "spark.sql.sources.v2.bucketing.sorting.enabled": "true",
       "spark.sql.adaptive.enabled": "true",
       "spark.sql.adaptive.coalescePartitions.enabled": "true",
       "spark.sql.adaptive.skewJoin.enabled": "true",
       "spark.sql.autoBroadcastJoinThreshold": "-1",
       "spark.sql.sources.partitionOverwriteMode": "dynamic",
   }
   
   
   def main():
       db = f"spj_repro_{uuid.uuid4().hex[:8]}"
   
       spark = (
           SparkSession.builder
           .config(
               "spark.sql.extensions",
               
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
           )
           .getOrCreate()
       )
   
       for key, value in SPJ_CONFIGS.items():
           spark.conf.set(key, value)
   
       print(f"Spark version: {spark.version}")
       print(f"Using database: {db}")
       print()
   
       # Verify configs
       for key, expected in SPJ_CONFIGS.items():
           actual = spark.conf.get(key, "NOT SET")
           status = "OK" if actual == expected else "MISMATCH"
           print(f"  [{status}] {key} = {actual}")
   
       print()
   
       catalog = "glue_catalog"
   
       # Ensure catalog is configured (matches EMR production setup)
       spark.conf.set(f"spark.sql.catalog.{catalog}", 
"org.apache.iceberg.spark.SparkCatalog")
       spark.conf.set(
           f"spark.sql.catalog.{catalog}.catalog-impl",
           "org.apache.iceberg.aws.glue.GlueCatalog",
       )
       spark.conf.set(
           f"spark.sql.catalog.{catalog}.io-impl",
           "org.apache.iceberg.aws.s3.S3FileIO",
       )
   
       spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{db}")
   
       target = f"{catalog}.{db}.target_table"
       source = f"{catalog}.{db}.source_table"
   
       try:
           # Matches production: 3 partition fields (identity + 2 buckets)
           # Join keys (user_id, post_id) are a SUBSET of partition keys
           # This triggers allowJoinKeysSubsetOfPartitionKeys code path
           spark.sql(f"""
               CREATE TABLE {target} (
                   user_id STRING,
                   post_id STRING,
                   collected_date STRING,
                   category_id STRING,
                   payload STRING
               )
               USING iceberg
               PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, 
category_id))
           """)
   
           spark.sql(f"""
               CREATE TABLE {source} (
                   user_id STRING,
                   post_id STRING,
                   collected_date STRING,
                   category_id STRING,
                   payload STRING
               )
               USING iceberg
               PARTITIONED BY (collected_date, bucket(32, user_id), bucket(32, 
category_id))
           """)
   
           dates = ["2026-05-01", "2026-05-02", "2026-05-03"]
           categories = ["cat_a", "cat_b", "cat_c", "cat_d"]
   
           print("Inserting data in multiple batches ...")
           for batch in range(3):
               for date in dates:
                   rows = []
                   for i in range(50):
                       uid = f"user_{batch * 50 + i}"
                       pid = f"post_{batch * 50 + i}_{date}"
                       cat = categories[i % len(categories)]
                       rows.append(
                           f"('{uid}', '{pid}', '{date}', '{cat}', 
'payload_b{batch}_{i}')"
                       )
                   values_str = ", ".join(rows)
                   spark.sql(f"INSERT INTO {target} VALUES {values_str}")
                   spark.sql(f"INSERT INTO {source} VALUES {values_str}")
   
           for date in dates:
               rows = []
               for i in range(20):
                   uid = f"new_user_{i}"
                   pid = f"new_post_{i}_{date}"
                   cat = categories[i % len(categories)]
                   rows.append(
                       f"('{uid}', '{pid}', '{date}', '{cat}', 
'new_payload_{i}')"
                   )
               values_str = ", ".join(rows)
               spark.sql(f"INSERT INTO {source} VALUES {values_str}")
   
           target_count = spark.sql(f"SELECT count(*) FROM 
{target}").collect()[0][0]
           source_count = spark.sql(f"SELECT count(*) FROM 
{source}").collect()[0][0]
           print(f"Target: {target_count} rows, Source: {source_count} rows")
           print()
   
           # Join on user_id + category_id: both map to bucket transforms.
           # collected_date is NOT in the join → triggers 
allowJoinKeysSubsetOfPartitionKeys.
           join_sql = f"""
               SELECT t.user_id, t.post_id, t.collected_date, t.payload
               FROM {target} t
               INNER JOIN {source} s
               ON t.user_id = s.user_id AND t.category_id = s.category_id
           """
   
           print("=== EXPLAIN SELECT JOIN ===")
           join_plan = spark.sql(f"EXPLAIN EXTENDED {join_sql}").collect()[0][0]
           print(join_plan)
           print("=== END EXPLAIN SELECT JOIN ===")
           print()
   
           for indicator in ["KeyGroupedPartitioning", "Exchange 
hashpartitioning"]:
               count = join_plan.count(indicator)
               print(f">>> '{indicator}' count in join plan: {count}")
   
           print()
   
           # MERGE with same join keys as production: user_id + category_id
           merge_sql = f"""
               MERGE INTO {target} t USING {source} s
               ON t.user_id = s.user_id AND t.category_id = s.category_id
               WHEN MATCHED THEN UPDATE SET *
               WHEN NOT MATCHED THEN INSERT *
           """
   
           print("=== EXPLAIN MERGE INTO ===")
           merge_plan = spark.sql(f"EXPLAIN EXTENDED 
{merge_sql}").collect()[0][0]
           print(merge_plan)
           print("=== END EXPLAIN MERGE INTO ===")
           print()
   
           for indicator in ["KeyGroupedPartitioning", "Exchange 
hashpartitioning"]:
               count = merge_plan.count(indicator)
               print(f">>> '{indicator}' count in merge plan: {count}")
   
           print()
   
           print("Running MERGE INTO ...")
           spark.sql(merge_sql)
   
           final_count = spark.sql(f"SELECT count(*) FROM 
{target}").collect()[0][0]
           print(f"SUCCESS: MERGE completed. Target table has {final_count} 
rows.")
   
       except Exception as e:
           print(f"FAILED: {e}")
           import traceback
           traceback.print_exc()
           raise
       finally:
           spark.sql(f"DROP TABLE IF EXISTS {target}")
           spark.sql(f"DROP TABLE IF EXISTS {source}")
           spark.sql(f"DROP DATABASE IF EXISTS {catalog}.{db}")
           print("Cleanup complete.")
   
   
   if __name__ == "__main__":
       main()
   
   ```
   
   
   Bash script `repro-spj-emr.sh`
   
   ```
   #!/usr/bin/env bash
   #
   # Reproducer for apache/iceberg#15349 on Amazon EMR
   # SPJ + bucket(String) → IllegalArgumentException
   #
   # Prerequisites:
   #   - AWS CLI configured with permissions for EMR, S3, Glue
   #   - An S3 bucket for scripts and warehouse
   #   - An EMR-compatible subnet and service role
   #
   # Usage:
   #   1. Edit the variables below
   #   2. ./repro-spj-emr.sh
   #   3. Check the EMR step logs for the error
   #
   set -euo pipefail
   
   # ─── EDIT THESE 
───────────────────────────────────────────────────────────────
   S3_BUCKET="{bucket_name}"                           # your S3 bucket
   SUBNET_ID="{subnet_id}"                       # your VPC subnet
   EMR_RELEASE="emr-spark-8.0.0"                           # EMR release with 
Spark 4.0
   SERVICE_ROLE="{service_role}"
   INSTANCE_PROFILE="{instance_profile}"
   REGION="{region}"
   # 
──────────────────────────────────────────────────────────────────────────────
   
   SCRIPT_S3="s3://${S3_BUCKET}/repro-spj/repro-spj-bucket-string.py"
   WAREHOUSE="s3://${S3_BUCKET}/repro-spj/warehouse/"
   LOG_URI="s3://${S3_BUCKET}/repro-spj/logs/"
   
   echo "Uploading PySpark script to ${SCRIPT_S3}..."
   aws s3 cp repro-spj-bucket-string.py "${SCRIPT_S3}" --region "${REGION}"
   
   echo "Creating EMR cluster with step..."
   CLUSTER_ID=$(aws emr create-cluster \
     --name "iceberg-spj-repro" \
     --release-label "${EMR_RELEASE}" \
     --applications Name=Spark \
     --region "${REGION}" \
     --log-uri "${LOG_URI}" \
     --service-role "${SERVICE_ROLE}" \
     --ec2-attributes 
"InstanceProfile=${INSTANCE_PROFILE},SubnetId=${SUBNET_ID}" \
     --instance-fleets '[
       {
         "Name": "Primary",
         "InstanceFleetType": "MASTER",
         "TargetOnDemandCapacity": 1,
         "InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
       },
       {
         "Name": "Core",
         "InstanceFleetType": "CORE",
         "TargetOnDemandCapacity": 2,
         "InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
       }
     ]' \
     --configurations '[
       {
         "Classification": "spark-defaults",
         "Properties": {
           "spark.sql.catalog.glue_catalog": 
"org.apache.iceberg.spark.SparkCatalog",
           "spark.sql.catalog.glue_catalog.catalog-impl": 
"org.apache.iceberg.aws.glue.GlueCatalog",
           "spark.sql.catalog.glue_catalog.warehouse": "'"${WAREHOUSE}"'",
           "spark.sql.catalog.glue_catalog.io-impl": 
"org.apache.iceberg.aws.s3.S3FileIO",
           "spark.sql.catalog.glue_catalog.skip-name-validation": "true",
           "spark.sql.extensions": 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
           "spark.jars": 
"/usr/share/aws/iceberg/lib/iceberg-spark4-runtime.jar",
           "spark.sql.iceberg.planning.preserve-data-grouping": "true",
           "spark.sql.sources.v2.bucketing.enabled": "true",
           "spark.sql.sources.v2.bucketing.pushPartValues.enabled": "true",
           
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled": 
"false",
           "spark.sql.requireAllClusterKeysForCoPartition": "false",
           
"spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled": 
"true",
           "spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled": 
"true",
           "spark.sql.sources.v2.bucketing.partition.filter.enabled": "true",
           "spark.sql.sources.v2.bucketing.shuffle.enabled": "true",
           "spark.sql.sources.v2.bucketing.sorting.enabled": "true",
           "spark.sql.adaptive.enabled": "true",
           "spark.sql.adaptive.coalescePartitions.enabled": "true",
           "spark.sql.adaptive.skewJoin.enabled": "true",
           "spark.sql.autoBroadcastJoinThreshold": "-1",
           "spark.sql.sources.partitionOverwriteMode": "dynamic"
         }
       },
       {
         "Classification": "iceberg-defaults",
         "Properties": {
           "iceberg.enabled": "true"
         }
       }
     ]' \
     --steps 
"Type=Spark,Name=SPJ-Bucket-String-Repro,ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--conf,spark.sql.adaptive.enabled=true,--conf,spark.sql.autoBroadcastJoinThreshold=-1,--conf,spark.sql.iceberg.planning.preserve-data-grouping=true,--conf,spark.sql.sources.v2.bucketing.enabled=true,--conf,spark.sql.sources.v2.bucketing.pushPartValues.enabled=true,--conf,spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=false,--conf,spark.sql.requireAllClusterKeysForCoPartition=false,--conf,spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled=true,--conf,spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled=true,${SCRIPT_S3}]"
 \
     --auto-terminate \
     --query 'ClusterId' \
     --output text)
   
   echo ""
   echo "Cluster created: ${CLUSTER_ID}"
   echo ""
   echo "Monitor progress:"
   echo "  aws emr describe-cluster --cluster-id ${CLUSTER_ID} --region 
${REGION} --query 'Cluster.Status.State'"
   echo ""
   echo "View step logs after completion:"
   echo "  aws emr list-steps --cluster-id ${CLUSTER_ID} --region ${REGION}"
   echo "  aws s3 ls ${LOG_URI}${CLUSTER_ID}/steps/"
   echo ""
   echo "The cluster will auto-terminate after the step completes."
   echo "If the bug is present, the step will FAIL with:"
   echo "  IllegalArgumentException: Wrong class, expected 
java.lang.CharSequence, but was java.lang.Integer"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to