soumilshah1995 commented on issue #10250: URL: https://github.com/apache/iceberg/issues/10250#issuecomment-3218114973
Hi there I tried above example its still doing sort merge join ``` from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id import os,sys # ---------------------------- # Environment # ---------------------------- os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" ICEBERG_VERSION = "1.5.0" # SPJ support SPARK_VERSION = "3.4" SUBMIT_ARGS = f"--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable warehouse_path = "/Users/soumilshah/Desktop/warehouse" # ---------------------------- # Spark Session # ---------------------------- spark = SparkSession.builder \ .appName("IcebergSPJExample") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.dev.type", "hadoop") \ .config("spark.sql.catalog.dev.warehouse", warehouse_path) \ .config("spark.sql.join.preferSortMergeJoin", "false") \ .config("spark.sql.sources.v2.bucketing.enabled", "true") \ .config("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true") \ .config("spark.sql.iceberg.planning.preserve-data-grouping", "true") \ .config("spark.sql.requireAllClusterKeysForCoPartition", "false") \ .config("spark.sql.autoBroadcastJoinThreshold", "-1") \ .config("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true") \ .getOrCreate() # ---------------------------- # Create two tables with same bucket partitioning # ---------------------------- df = spark.range(0, 100_000) # simple numeric dataset # Add partition column based on Spark partition id df_a = df.repartition(10).withColumn("part", spark_partition_id()) df_b = df.repartition(10).withColumn("part", spark_partition_id()) # Write table A df_a.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice1") # Write table B df_b.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice2") spark.sql("SET spark.sql.autoBroadcastJoinThreshold=-1") spark.sql("SET spark.sql.join.preferSortMergeJoin=false") spark.sql("SET `spark.sql.sources.v2.bucketing.enabled`=true") spark.sql("SET `spark.sql.sources.v2.bucketing.pushPartValues.enabled`=true") spark.sql("SET `spark.sql.iceberg.planning.preserve-data-grouping`=true") spark.sql("SET `spark.sql.requireAllClusterKeysForCoPartition`=false") spark.sql("SET `spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled`=true") spark.sql(""" SELECT a.id AS id1, b.id AS id2 FROM dev.ice1 a JOIN dev.ice2 b ON a.id = b.id AND a.part = b.part LIMIT 20 """).show() ``` <img width="551" height="620" alt="Image" src="https://github.com/user-attachments/assets/d1560a63-0cd6-4a41-9034-f6424f6c96a2" /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org