This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 9152dc2c2 chore: Remove num partitions from repartitioner (#1498)
9152dc2c2 is described below
commit 9152dc2c23276023708237d7aa857c1da1b2807b
Author: Emily Matheys <[email protected]>
AuthorDate: Tue Mar 11 21:16:28 2025 +0200
chore: Remove num partitions from repartitioner (#1498)
* chore: Remove num partitions from repartitioner
treat all single partition schemes the same way
remove duplicate messaging by using assert_eq instead of assert
* rebased and resolved conflicts
---------
Co-authored-by: Emily Matheys <[email protected]>
---
.../core/src/execution/shuffle/shuffle_writer.rs | 46 +++++++++-------------
1 file changed, 18 insertions(+), 28 deletions(-)
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index f1c2b4d78..9f6c7e406 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -290,7 +290,6 @@ struct ShuffleRepartitioner {
buffered_partitions: Vec<PartitionBuffer>,
/// Partitioning scheme to use
partitioning: Partitioning,
- num_output_partitions: usize,
runtime: Arc<RuntimeEnv>,
metrics: ShuffleRepartitionerMetrics,
/// Hashes for each row in the current batch
@@ -315,8 +314,6 @@ impl ShuffleRepartitioner {
codec: CompressionCodec,
enable_fast_encoding: bool,
) -> Result<Self> {
- let num_output_partitions = partitioning.partition_count();
-
let mut hashes_buf = Vec::with_capacity(batch_size);
let mut partition_ids = Vec::with_capacity(batch_size);
@@ -336,7 +333,7 @@ impl ShuffleRepartitioner {
output_data_file,
output_index_file,
schema: Arc::clone(&schema),
- buffered_partitions: (0..num_output_partitions)
+ buffered_partitions: (0..partitioning.partition_count())
.map(|_| {
PartitionBuffer::try_new(
Arc::clone(&schema),
@@ -348,7 +345,6 @@ impl ShuffleRepartitioner {
})
.collect::<Result<Vec<_>>>()?,
partitioning,
- num_output_partitions,
runtime,
metrics,
hashes_buf,
@@ -401,9 +397,21 @@ impl ShuffleRepartitioner {
// number of rows those are written to output data file.
self.metrics.baseline.record_output(input.num_rows());
- let num_output_partitions = self.num_output_partitions;
match &self.partitioning {
- Partitioning::Hash(exprs, _) => {
+ any if any.partition_count() == 1 => {
+ let buffered_partitions = &mut self.buffered_partitions;
+
+ assert_eq!(buffered_partitions.len(), 1, "Expected 1
partition");
+
+ // TODO the single partition case could be optimized to avoid
appending all
+ // rows from the batch into builders and then recreating the
batch
+ // https://github.com/apache/datafusion-comet/issues/1453
+ let indices = (0..input.num_rows()).collect::<Vec<usize>>();
+
+ self.append_rows_to_partition(input.columns(), &indices, 0)
+ .await?;
+ }
+ Partitioning::Hash(exprs, num_output_partitions) => {
let (partition_starts, shuffled_partition_ids): (Vec<usize>,
Vec<usize>) = {
let mut timer = self.metrics.repart_time.timer();
@@ -423,11 +431,11 @@ impl ShuffleRepartitioner {
.iter()
.enumerate()
.for_each(|(idx, hash)| {
- partition_ids[idx] = pmod(*hash,
num_output_partitions) as u64
+ partition_ids[idx] = pmod(*hash,
*num_output_partitions) as u64
});
// count each partition size
- let mut partition_counters = vec![0usize;
num_output_partitions];
+ let mut partition_counters = vec![0usize;
*num_output_partitions];
partition_ids
.iter()
.for_each(|partition_id|
partition_counters[*partition_id as usize] += 1);
@@ -478,24 +486,6 @@ impl ShuffleRepartitioner {
.await?;
}
}
- Partitioning::UnknownPartitioning(n) if *n == 1 => {
- let buffered_partitions = &mut self.buffered_partitions;
-
- assert_eq!(
- buffered_partitions.len(),
- 1,
- "Expected 1 partition but got {}",
- buffered_partitions.len()
- );
-
- // TODO the single partition case could be optimized to avoid
appending all
- // rows from the batch into builders and then recreating the
batch
- // https://github.com/apache/datafusion-comet/issues/1453
- let indices = (0..input.num_rows()).collect::<Vec<usize>>();
-
- self.append_rows_to_partition(input.columns(), &indices, 0)
- .await?;
- }
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
@@ -511,8 +501,8 @@ impl ShuffleRepartitioner {
/// Writes buffered shuffled record batches into Arrow IPC bytes.
async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
let mut elapsed_compute =
self.metrics.baseline.elapsed_compute().timer();
- let num_output_partitions = self.num_output_partitions;
let buffered_partitions = &mut self.buffered_partitions;
+ let num_output_partitions = buffered_partitions.len();
let mut output_batches: Vec<Vec<u8>> = vec![vec![];
num_output_partitions];
let mut offsets = vec![0; num_output_partitions + 1];
for i in 0..num_output_partitions {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]