danielhumanmod commented on code in PR #1380:
URL:
https://github.com/apache/datafusion-ballista/pull/1380#discussion_r2710922716
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -1017,10 +1102,177 @@ mod tests {
.unwrap()
}
+ fn create_custom_test_batch(rows: usize) -> RecordBatch {
+ let schema = create_test_schema();
+
+ // 1. Create number column (0, 1, 2, ..., rows-1)
+ let number_vec: Vec<u32> = (0..rows as u32).collect();
+ let number_array = UInt32Array::from(number_vec);
+
+ // 2. Create string column ("s0", "s1", ..., "s{rows-1}")
+ // Just to fill data, the content is not important
+ let string_vec: Vec<String> = (0..rows).map(|i| format!("s{}",
i)).collect();
+ let string_array = StringArray::from(string_vec);
+
+ RecordBatch::try_new(schema, vec![Arc::new(number_array),
Arc::new(string_array)])
+ .unwrap()
+ }
+
fn create_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("number", DataType::UInt32, true),
Field::new("str", DataType::Utf8, true),
]))
}
+
+ use datafusion::physical_plan::memory::MemoryStream;
+
+ #[tokio::test]
+ async fn test_coalesce_stream_logic() -> Result<()> {
+ // 1. Create test data - 10 small batches, each with 3 rows
+ let schema = create_test_schema();
+ let small_batch = create_test_batch();
+ let batches = vec![small_batch.clone(); 10];
+
+ // 2. Create mock upstream stream (Input Stream)
+ let input_stream = MemoryStream::try_new(batches, schema.clone(),
None)?;
+ let input_stream = Box::pin(input_stream) as SendableRecordBatchStream;
+
+ // 3. Configure Coalescer: target batch size to 10 rows
+ let target_batch_size = 10;
+
+ // 4. Manually build the CoalescedShuffleReaderStream
+ let coalesced_stream = CoalescedShuffleReaderStream {
Review Comment:
Good point, will add, thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]