rajasinghr commented on issue #7832:
URL: https://github.com/apache/iceberg/issues/7832#issuecomment-1599139147
Hi @RussellSpitzer , I tested SPJ just similar to what you have shared
above. Still I couldnt get SPJ and see three exchange components instead of
one. Let me know if miss anything.
```
# creating session
%session_id_prefix native-iceberg-sql-
%glue_version 3.0
%idle_timeout 600
%extra_jars
s3://com.rippling.controlplane.usw2.datainfrabuilds/iceberg/iceberg-spark-runtime-3.3_2.12-1.3.0.jar
%%configure
{
"--conf":
"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"--datalake-formats": "iceberg"
}
```
```
catalog_name = "default"
bucket_name = "dev_bucket"
bucket_prefix = ""
database_name = "core"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
```
```
#spark configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.warehouse.dir", warehouse_path) \
.config(f"spark.sql.catalog.{catalog_name}",
"org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.sources.v2.bucketing.enabled", "true") \
.config("spark.sql.sources.v2.bucketing.push.part.values.enabled",
"true") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.sql.adaptive.enabled", "false") \
.config("spark.sql.requireAllClusterKeysForCoPartition", "false") \
.config("spark.sql.iceberg.planning.preserve-data-grouping", "true") \
.config("spark.sql.shuffle.partitions", 4) \
.getOrCreate()
```
```
%%sql
CREATE OR REPLACE TABLE default.core.tbl1 (id BIGINT, dep STRING)
USING iceberg
PARTITIONED BY (bucket(8, id), dep)
TBLPROPERTIES
('read.split.target-size'='16777216','read.split.open-file-cost'='16777216')
%%sql
INSERT INTO default.core.tbl1 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'),
(5, 'e'), (6, 'f');
%%sql
CREATE OR REPLACE TABLE default.core.tbl2 (id BIGINT, dep STRING)
USING iceberg
PARTITIONED BY (bucket(8, id), dep)
TBLPROPERTIES
('read.split.target-size'='16777216','read.split.open-file-cost'='16777216')
%%sql
INSERT INTO default.core.tbl2 VALUES (1, 'a'), (2, 'b'), (3, 'c');
query = """EXPLAIN SELECT t1.id FROM default.core.tbl1 t1
INNER JOIN default.core.tbl2 t2
ON t1.id = t2.id AND t1.dep = t2.dep
ORDER BY t1.id"""
tex = spark.sql(query)
tex.show(truncate=False)
```
Here is the plan
|== Physical Plan ==
*(6) Sort [id#37L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#37L ASC NULLS FIRST, 4),
ENSURE_REQUIREMENTS, [id=#112]
+- *(5) Project [id#37L]
+- *(5) SortMergeJoin [id#37L, dep#38], [id#39L, dep#40], Inner
:- *(2) Sort [id#37L ASC NULLS FIRST, dep#38 ASC NULLS FIRST],
false, 0
: +- Exchange hashpartitioning(id#37L, dep#38, 4),
ENSURE_REQUIREMENTS, [id=#96]
: +- *(1) Filter (isnotnull(id#37L) AND isnotnull(dep#38))
: +- BatchScan[id#37L, dep#38] default.core.tbl1 [filters=id
IS NOT NULL, dep IS NOT NULL]
+- *(4) Sort [id#39L ASC NULLS FIRST, dep#40 ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(id#39L, dep#40, 4),
ENSURE_REQUIREMENTS, [id=#104]
+- *(3) Filter (isnotnull(id#39L) AND isnotnull(dep#40))
+- BatchScan[id#39L, dep#40] default.core.tbl2 [filters=id
IS NOT NULL, dep IS NOT NULL]
|
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]