milenkovicm commented on code in PR #1380:
URL: 
https://github.com/apache/datafusion-ballista/pull/1380#discussion_r2702347954


##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -162,16 +165,17 @@ impl ExecutionPlan for ShuffleReaderExec {
         let max_message_size = config.ballista_grpc_client_max_message_size();
         let force_remote_read = 
config.ballista_shuffle_reader_force_remote_read();
         let prefer_flight = 
config.ballista_shuffle_reader_remote_prefer_flight();
+        let batch_size = config.batch_size();
 
         if force_remote_read {
             debug!(
-                "All shuffle partitions will be read as remote partitions! To 
disable this behavior set: `{}=false`",
-                crate::config::BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ
-            );
+            "All shuffle partitions will be read as remote partitions! To 
disable this behavior set: `{}=false`",
+            crate::config::BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ
+        );
         }
 
         log::debug!(
-            "ShuffleReaderExec::execute({task_id}) max_request_num: 
{max_request_num}, max_message_size: {max_message_size}"
+            "ShuffleReaderExec::execute({task_id}) max_request_num: 
{max_request_num}, max_message_size: {max_message_size}, batch_size: 
{batch_size}"

Review Comment:
   do we need batch size here? its part of configuration, we can find out if 
needed 



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -197,11 +201,22 @@ impl ExecutionPlan for ShuffleReaderExec {
             prefer_flight,
         );
 
-        let result = RecordBatchStreamAdapter::new(
-            Arc::new(self.schema.as_ref().clone()),
+        let input_stream = Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
             response_receiver.try_flatten(),
-        );
-        Ok(Box::pin(result))
+        ));
+
+        Ok(Box::pin(CoalescedShuffleReaderStream {
+            schema: self.schema.clone(),
+            input: input_stream,
+            coalescer: LimitedBatchCoalescer::new(

Review Comment:
   maybe we could add limit to ShuffleReader, defaulting to None, and add 
`with_limit` option, so if we find a use-case we can set it ? 



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -197,11 +201,22 @@ impl ExecutionPlan for ShuffleReaderExec {
             prefer_flight,
         );
 
-        let result = RecordBatchStreamAdapter::new(
-            Arc::new(self.schema.as_ref().clone()),
+        let input_stream = Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
             response_receiver.try_flatten(),
-        );
-        Ok(Box::pin(result))
+        ));
+
+        Ok(Box::pin(CoalescedShuffleReaderStream {
+            schema: self.schema.clone(),
+            input: input_stream,
+            coalescer: LimitedBatchCoalescer::new(

Review Comment:
   what is the reason using `LimitedBatchCoalescer` instead of `BatchCoalescer` 
as there is no fetch limit?
   
   would there be a case where we can use limit 🤔?



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -1017,10 +1102,177 @@ mod tests {
         .unwrap()
     }
 
+    fn create_custom_test_batch(rows: usize) -> RecordBatch {
+        let schema = create_test_schema();
+
+        // 1. Create number column (0, 1, 2, ..., rows-1)
+        let number_vec: Vec<u32> = (0..rows as u32).collect();
+        let number_array = UInt32Array::from(number_vec);
+
+        // 2. Create string column ("s0", "s1", ..., "s{rows-1}")
+        // Just to fill data, the content is not important
+        let string_vec: Vec<String> = (0..rows).map(|i| format!("s{}", 
i)).collect();
+        let string_array = StringArray::from(string_vec);
+
+        RecordBatch::try_new(schema, vec![Arc::new(number_array), 
Arc::new(string_array)])
+            .unwrap()
+    }
+
     fn create_test_schema() -> SchemaRef {
         Arc::new(Schema::new(vec![
             Field::new("number", DataType::UInt32, true),
             Field::new("str", DataType::Utf8, true),
         ]))
     }
+
+    use datafusion::physical_plan::memory::MemoryStream;
+
+    #[tokio::test]
+    async fn test_coalesce_stream_logic() -> Result<()> {
+        // 1. Create test data - 10 small batches, each with 3 rows
+        let schema = create_test_schema();
+        let small_batch = create_test_batch();
+        let batches = vec![small_batch.clone(); 10];
+
+        // 2. Create mock upstream stream (Input Stream)
+        let input_stream = MemoryStream::try_new(batches, schema.clone(), 
None)?;
+        let input_stream = Box::pin(input_stream) as SendableRecordBatchStream;
+
+        // 3. Configure Coalescer: target batch size to 10 rows
+        let target_batch_size = 10;
+
+        // 4. Manually build the CoalescedShuffleReaderStream
+        let coalesced_stream = CoalescedShuffleReaderStream {

Review Comment:
   i believe adding 'new' method would make sense 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to