alamb commented on code in PR #5632:
URL: https://github.com/apache/arrow-datafusion/pull/5632#discussion_r1141007193
##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -2212,4 +2258,135 @@ mod tests {
assert_batches_sorted_eq!(expected, &batches);
Ok(())
}
+
+ #[tokio::test]
+ async fn overallocation_single_batch() -> Result<()> {
+ let left = build_table(
+ ("a1", &vec![0, 1, 2, 3, 4, 5]),
+ ("b1", &vec![1, 2, 3, 4, 5, 6]),
+ ("c1", &vec![4, 5, 6, 7, 8, 9]),
+ );
+ let right = build_table(
+ ("a2", &vec![0, 10, 20, 30, 40]),
+ ("b2", &vec![1, 3, 4, 6, 8]),
+ ("c2", &vec![50, 60, 70, 80, 90]),
+ );
+ let on = vec![(
+ Column::new_with_schema("b1", &left.schema())?,
+ Column::new_with_schema("b2", &right.schema())?,
+ )];
+ let sort_options = vec![SortOptions::default(); on.len()];
+
+ let join_types = vec![
+ JoinType::Inner,
+ JoinType::Left,
+ JoinType::Right,
+ JoinType::Full,
+ JoinType::LeftSemi,
+ JoinType::LeftAnti,
+ ];
+
+ for join_type in join_types {
+ let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
+ let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+ let session_config = SessionConfig::default().with_batch_size(50);
+ let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
+ let task_ctx = session_ctx.task_ctx();
+ let join = join_with_options(
+ left.clone(),
+ right.clone(),
+ on.clone(),
+ join_type,
+ sort_options.clone(),
+ false,
+ )?;
+
+ let stream = join.execute(0, task_ctx)?;
+ let err = common::collect(stream).await.unwrap_err();
+
+ assert_contains!(
+ err.to_string(),
+ "Resources exhausted: Failed to allocate additional"
Review Comment:
Thank you for the unit test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]