This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8216e32e87 [MINOR]: Fix some minor silent bugs (#11127)
8216e32e87 is described below
commit 8216e32e87b2238d8814fe16215c8770d6c327c8
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Jun 27 17:10:31 2024 +0300
[MINOR]: Fix some minor silent bugs (#11127)
---
datafusion/core/tests/fifo/mod.rs | 59 +++++++++++++-------------
datafusion/core/tests/tpcds_planning.rs | 5 ++-
datafusion/physical-expr/src/partitioning.rs | 4 +-
datafusion/physical-plan/src/aggregates/mod.rs | 2 +-
4 files changed, 38 insertions(+), 32 deletions(-)
diff --git a/datafusion/core/tests/fifo/mod.rs
b/datafusion/core/tests/fifo/mod.rs
index 2e21abffab..1df97b1636 100644
--- a/datafusion/core/tests/fifo/mod.rs
+++ b/datafusion/core/tests/fifo/mod.rs
@@ -217,17 +217,6 @@ mod unix_test {
.set_bool("datafusion.execution.coalesce_batches", false)
.with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
- // Tasks
- let mut tasks: Vec<JoinHandle<()>> = vec![];
-
- // Join filter
- let a1_iter = 0..TEST_DATA_SIZE;
- // Join key
- let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
- let lines = a1_iter
- .zip(a2_iter)
- .map(|(a1, a2)| format!("{a1},{a2}\n"))
- .collect::<Vec<_>>();
// Create a new temporary FIFO file
let tmp_dir = TempDir::new()?;
@@ -238,22 +227,6 @@ mod unix_test {
// Create a mutex for tracking if the right input source is waiting
for data.
let waiting = Arc::new(AtomicBool::new(true));
- // Create writing threads for the left and right FIFO files
- tasks.push(create_writing_thread(
- left_fifo.clone(),
- "a1,a2\n".to_owned(),
- lines.clone(),
- waiting.clone(),
- TEST_BATCH_SIZE,
- ));
- tasks.push(create_writing_thread(
- right_fifo.clone(),
- "a1,a2\n".to_owned(),
- lines.clone(),
- waiting.clone(),
- TEST_BATCH_SIZE,
- ));
-
// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::UInt32, false),
@@ -264,10 +237,10 @@ mod unix_test {
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
// Set unbounded sorted files read configuration
- let provider = fifo_table(schema.clone(), left_fifo, order.clone());
+ let provider = fifo_table(schema.clone(), left_fifo.clone(),
order.clone());
ctx.register_table("left", provider)?;
- let provider = fifo_table(schema.clone(), right_fifo, order);
+ let provider = fifo_table(schema.clone(), right_fifo.clone(), order);
ctx.register_table("right", provider)?;
// Execute the query, with no matching rows. (since key is modulus 10)
@@ -287,6 +260,34 @@ mod unix_test {
.await?;
let mut stream = df.execute_stream().await?;
let mut operations = vec![];
+
+ // Tasks
+ let mut tasks: Vec<JoinHandle<()>> = vec![];
+
+ // Join filter
+ let a1_iter = 0..TEST_DATA_SIZE;
+ // Join key
+ let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
+ let lines = a1_iter
+ .zip(a2_iter)
+ .map(|(a1, a2)| format!("{a1},{a2}\n"))
+ .collect::<Vec<_>>();
+
+ // Create writing threads for the left and right FIFO files
+ tasks.push(create_writing_thread(
+ left_fifo,
+ "a1,a2\n".to_owned(),
+ lines.clone(),
+ waiting.clone(),
+ TEST_BATCH_SIZE,
+ ));
+ tasks.push(create_writing_thread(
+ right_fifo,
+ "a1,a2\n".to_owned(),
+ lines.clone(),
+ waiting.clone(),
+ TEST_BATCH_SIZE,
+ ));
// Partial.
while let Some(Ok(batch)) = stream.next().await {
waiting.store(false, Ordering::SeqCst);
diff --git a/datafusion/core/tests/tpcds_planning.rs
b/datafusion/core/tests/tpcds_planning.rs
index 44fb0afff3..b99bc26800 100644
--- a/datafusion/core/tests/tpcds_planning.rs
+++ b/datafusion/core/tests/tpcds_planning.rs
@@ -1044,7 +1044,10 @@ async fn regression_test(query_no: u8, create_physical:
bool) -> Result<()> {
for table in &tables {
ctx.register_table(
table.name.as_str(),
- Arc::new(MemTable::try_new(Arc::new(table.schema.clone()),
vec![])?),
+ Arc::new(MemTable::try_new(
+ Arc::new(table.schema.clone()),
+ vec![vec![]],
+ )?),
)?;
}
diff --git a/datafusion/physical-expr/src/partitioning.rs
b/datafusion/physical-expr/src/partitioning.rs
index fcb3278b60..273c77fb1d 100644
--- a/datafusion/physical-expr/src/partitioning.rs
+++ b/datafusion/physical-expr/src/partitioning.rs
@@ -152,6 +152,8 @@ impl Partitioning {
match required {
Distribution::UnspecifiedDistribution => true,
Distribution::SinglePartition if self.partition_count() == 1 =>
true,
+ // When partition count is 1, hash requirement is satisfied.
+ Distribution::HashPartitioned(_) if self.partition_count() == 1 =>
true,
Distribution::HashPartitioned(required_exprs) => {
match self {
// Here we do not check the partition count for hash
partitioning and assumes the partition count
@@ -290,7 +292,7 @@ mod tests {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
- assert_eq!(result, (false, false, false, true, false))
+ assert_eq!(result, (true, false, false, true, false))
}
}
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 4c187f03f3..533d10357b 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -675,7 +675,7 @@ impl ExecutionPlan for AggregateExec {
vec![Distribution::UnspecifiedDistribution]
}
AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned
=> {
- vec![Distribution::HashPartitioned(self.output_group_expr())]
+
vec![Distribution::HashPartitioned(self.group_by.input_exprs())]
}
AggregateMode::Final | AggregateMode::Single => {
vec![Distribution::SinglePartition]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]