mbutrovich commented on code in PR #1511:
URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2017297638


##########
native/core/src/execution/shuffle/shuffle_writer.rs:
##########
@@ -498,81 +502,63 @@ impl ShuffleRepartitioner {
         Ok(())
     }
 
-    /// Writes buffered shuffled record batches into Arrow IPC bytes.
-    async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
-        let mut elapsed_compute = 
self.metrics.baseline.elapsed_compute().timer();
-        let buffered_partitions = &mut self.buffered_partitions;
-        let num_output_partitions = buffered_partitions.len();
-        let mut output_batches: Vec<Vec<u8>> = vec![vec![]; 
num_output_partitions];
-        let mut offsets = vec![0; num_output_partitions + 1];
-        for i in 0..num_output_partitions {
-            buffered_partitions[i].flush(&self.metrics)?;
-            output_batches[i] = std::mem::take(&mut 
buffered_partitions[i].frozen);
-        }
-
-        let data_file = self.output_data_file.clone();
-        let index_file = self.output_index_file.clone();
-
-        let mut write_time = self.metrics.write_time.timer();
-
-        let output_data = OpenOptions::new()
-            .write(true)
-            .create(true)
-            .truncate(true)
-            .open(data_file)
-            .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {:?}", e)))?;
-
-        let mut output_data = BufWriter::new(output_data);
-
-        for i in 0..num_output_partitions {
-            offsets[i] = output_data.stream_position()?;
-            output_data.write_all(&output_batches[i])?;
-            output_batches[i].clear();
+    async fn buffer_partitioned_batch_may_spill(
+        &mut self,
+        input: RecordBatch,
+        shuffled_partition_ids: &[u32],
+        partition_starts: &[u32],
+    ) -> Result<()> {
+        let mut mem_growth: usize = input.get_array_memory_size();
+        let buffered_partition_idx = self.buffered_batches.len() as u32;
+        self.buffered_batches.push(input);
 
-            // if we wrote a spill file for this partition then copy the
-            // contents into the shuffle file
-            if let Some(spill_data) = 
self.buffered_partitions[i].spill_file.as_ref() {
-                let mut spill_file = BufReader::new(
-                    
File::open(spill_data.temp_file.path()).map_err(Self::to_df_err)?,
-                );
-                std::io::copy(&mut spill_file, &mut 
output_data).map_err(Self::to_df_err)?;
+        for (partition_id, (&start, &end)) in partition_starts

Review Comment:
   Could you add high level documentation what this chunk of code is doing? The 
combination of `for` with the `filter` and a nested `for` make it non-obvious.



##########
native/core/src/execution/shuffle/shuffle_writer.rs:
##########
@@ -283,24 +299,50 @@ impl ShuffleRepartitionerMetrics {
     }
 }
 
-struct ShuffleRepartitioner {
+#[async_trait::async_trait]
+trait ShufflePartitioner: Send + Sync {
+    /// Insert a batch into the partitioner
+    async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
+    /// Write shuffle data and shuffle index file to disk
+    async fn shuffle_write(&mut self) -> Result<()>;
+}
+
+/// A partitioner that uses a hash function to partition data into multiple 
partitions
+struct MultiPartitionShuffleRepartitioner {
     output_data_file: String,
     output_index_file: String,
-    schema: SchemaRef,
-    buffered_partitions: Vec<PartitionBuffer>,
+    buffered_batches: Vec<RecordBatch>,
+    partition_indices: Vec<Vec<(u32, u32)>>,
+    partition_writers: Vec<PartitionWriter>,
+    shuffle_block_writer: ShuffleBlockWriter,
     /// Partitioning scheme to use
     partitioning: Partitioning,
     runtime: Arc<RuntimeEnv>,
     metrics: ShuffleRepartitionerMetrics,
-    /// Hashes for each row in the current batch
-    hashes_buf: Vec<u32>,
-    /// Partition ids for each row in the current batch
-    partition_ids: Vec<u64>,
+    /// Reused scratch space for computing partition indices
+    scratch: ScratchSpace,
     /// The configured batch size
     batch_size: usize,
+    /// Reservation for repartitioning
+    reservation: MemoryReservation,
+}
+
+#[derive(Default)]
+struct ScratchSpace {
+    /// Hashes for each row in the current batch.
+    hashes_buf: Vec<u32>,
+    /// Partition ids for each row in the current batch.
+    partition_ids: Vec<u32>,
+    /// The row indices of the rows in each partition. This array is 
conceptually dividied into

Review Comment:
   Typo: `dividied`



##########
native/core/src/execution/shuffle/shuffle_writer.rs:
##########
@@ -314,65 +356,45 @@ impl ShuffleRepartitioner {
         codec: CompressionCodec,
         enable_fast_encoding: bool,
     ) -> Result<Self> {
-        let mut hashes_buf = Vec::with_capacity(batch_size);
-        let mut partition_ids = Vec::with_capacity(batch_size);
-
-        // Safety: `hashes_buf` will be filled with valid values before being 
used.
-        // `partition_ids` will be filled with valid values before being used.
-        unsafe {
-            hashes_buf.set_len(batch_size);
-            partition_ids.set_len(batch_size);
-        }
+        let num_output_partitions = partitioning.partition_count();

Review Comment:
   Is `assert_ne(num_output_partitions, 1, "Use 
SinglePartitionShufflePartitioner for 1 output partition.")` a valid assertion 
to add?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to