Kontinuation commented on code in PR #1511: URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2017870742
########## native/core/src/execution/shuffle/shuffle_writer.rs: ########## @@ -314,65 +356,45 @@ impl ShuffleRepartitioner { codec: CompressionCodec, enable_fast_encoding: bool, ) -> Result<Self> { - let mut hashes_buf = Vec::with_capacity(batch_size); - let mut partition_ids = Vec::with_capacity(batch_size); - - // Safety: `hashes_buf` will be filled with valid values before being used. - // `partition_ids` will be filled with valid values before being used. - unsafe { - hashes_buf.set_len(batch_size); - partition_ids.set_len(batch_size); - } + let num_output_partitions = partitioning.partition_count(); Review Comment: Added the assertion. ########## native/core/src/execution/shuffle/shuffle_writer.rs: ########## @@ -283,24 +299,50 @@ impl ShuffleRepartitionerMetrics { } } -struct ShuffleRepartitioner { +#[async_trait::async_trait] +trait ShufflePartitioner: Send + Sync { + /// Insert a batch into the partitioner + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>; + /// Write shuffle data and shuffle index file to disk + async fn shuffle_write(&mut self) -> Result<()>; +} + +/// A partitioner that uses a hash function to partition data into multiple partitions +struct MultiPartitionShuffleRepartitioner { output_data_file: String, output_index_file: String, - schema: SchemaRef, - buffered_partitions: Vec<PartitionBuffer>, + buffered_batches: Vec<RecordBatch>, + partition_indices: Vec<Vec<(u32, u32)>>, + partition_writers: Vec<PartitionWriter>, + shuffle_block_writer: ShuffleBlockWriter, /// Partitioning scheme to use partitioning: Partitioning, runtime: Arc<RuntimeEnv>, metrics: ShuffleRepartitionerMetrics, - /// Hashes for each row in the current batch - hashes_buf: Vec<u32>, - /// Partition ids for each row in the current batch - partition_ids: Vec<u64>, + /// Reused scratch space for computing partition indices + scratch: ScratchSpace, /// The configured batch size batch_size: usize, + /// Reservation for repartitioning + reservation: MemoryReservation, +} + +#[derive(Default)] +struct ScratchSpace { + /// Hashes for each row in the current batch. + hashes_buf: Vec<u32>, + /// Partition ids for each row in the current batch. + partition_ids: Vec<u32>, + /// The row indices of the rows in each partition. This array is conceptually dividied into Review Comment: Fixed typo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org