soumilshah1995 commented on issue #10250:
URL: https://github.com/apache/iceberg/issues/10250#issuecomment-3218114973

   Hi there I tried above example its still doing sort merge join 
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import spark_partition_id
   import os,sys
   
   # ----------------------------
   # Environment
   # ----------------------------
   os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
   
   ICEBERG_VERSION = "1.5.0"  # SPJ support
   SPARK_VERSION = "3.4"
   
   SUBMIT_ARGS = f"--packages 
org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION} 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   
   warehouse_path = "/Users/soumilshah/Desktop/warehouse"
   
   # ----------------------------
   # Spark Session
   # ----------------------------
   spark = SparkSession.builder \
       .appName("IcebergSPJExample") \
       .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
       .config("spark.sql.catalog.dev", 
"org.apache.iceberg.spark.SparkCatalog") \
       .config("spark.sql.catalog.dev.type", "hadoop") \
       .config("spark.sql.catalog.dev.warehouse", warehouse_path) \
       .config("spark.sql.join.preferSortMergeJoin", "false") \
       .config("spark.sql.sources.v2.bucketing.enabled", "true") \
       .config("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true") 
\
       .config("spark.sql.iceberg.planning.preserve-data-grouping", "true") \
       .config("spark.sql.requireAllClusterKeysForCoPartition", "false") \
       .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
       
.config("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled",
 "true") \
       .getOrCreate()
   
   
   # ----------------------------
   # Create two tables with same bucket partitioning
   # ----------------------------
   df = spark.range(0, 100_000)  # simple numeric dataset
   
   # Add partition column based on Spark partition id
   df_a = df.repartition(10).withColumn("part", spark_partition_id())
   df_b = df.repartition(10).withColumn("part", spark_partition_id())
   
   # Write table A
   
df_a.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice1")
   
   # Write table B
   
df_b.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice2")
   
   spark.sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
   spark.sql("SET spark.sql.join.preferSortMergeJoin=false")
   spark.sql("SET `spark.sql.sources.v2.bucketing.enabled`=true")
   spark.sql("SET `spark.sql.sources.v2.bucketing.pushPartValues.enabled`=true")
   spark.sql("SET `spark.sql.iceberg.planning.preserve-data-grouping`=true")
   spark.sql("SET `spark.sql.requireAllClusterKeysForCoPartition`=false")
   spark.sql("SET 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled`=true")
   
   
   spark.sql("""
   SELECT a.id AS id1, b.id AS id2
   FROM dev.ice1 a
   JOIN dev.ice2 b
   ON a.id = b.id AND a.part = b.part
   LIMIT 20
   """).show()
   ```
   
   
   <img width="551" height="620" alt="Image" 
src="https://github.com/user-attachments/assets/d1560a63-0cd6-4a41-9034-f6424f6c96a2";
 />


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to