Kontinuation commented on PR #1862:
URL:
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2985113515
This implementation of RangePartitioning may be incorrect. RangePartitioning
should partition the input DataFrame into partitions with consecutive and
non-overlapping ranges, this requires scanning the entire DataFrame to obtain
the ranges of each partition before performing the actual shuffle writing.
Here is the PySpark code to illustrate the difference between the behavior
of Comet and Vanilla Spark.
```python
spark.range(0,
100000).write.format("parquet").mode("overwrite").save("range-partitioning")
df = spark.read.parquet("range-partitioning")
df_range_partitioned = df.repartitionByRange(10, "id")
df_range_partitioned.explain()
# Show the min and max of each range
def get_partition_bounds(idx, iterator):
min = None
max = None
for row in iterator:
if min is None or row.id < min:
min = row.id
if max is None or row.id > max:
max = row.id
yield idx, min, max
partition_bounds =
df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect()
# Print the results
for partition_id, min_id, max_id in sorted(partition_bounds):
print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")
```
**Comet**:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10),
REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173]
+- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format:
CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
Partition 0: min_id=0, max_id=90799
Partition 1: min_id=753, max_id=91680
Partition 2: min_id=1527, max_id=92520
Partition 3: min_id=2399, max_id=93284
Partition 4: min_id=3274, max_id=94123
Partition 5: min_id=4053, max_id=94844
Partition 6: min_id=4851, max_id=95671
Partition 7: min_id=5738, max_id=96522
Partition 8: min_id=6571, max_id=97335
Partition 9: min_id=7408, max_id=99999
```
**Spark**:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10),
REPARTITION_BY_NUM, [plan_id=197]
+- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
Partition 0: min_id=0, max_id=9974
Partition 1: min_id=9975, max_id=19981
Partition 2: min_id=19982, max_id=29993
Partition 3: min_id=29994, max_id=39997
Partition 4: min_id=39998, max_id=49959
Partition 5: min_id=49960, max_id=59995
Partition 6: min_id=59996, max_id=69898
Partition 7: min_id=69899, max_id=79970
Partition 8: min_id=79971, max_id=89976
Partition 9: min_id=89977, max_id=99999
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]