This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6de0796b4 Ensure the row count is preserved when coalescing over empty
records (#3439)
6de0796b4 is described below
commit 6de0796b44766a8aef308b09706cddc9609801f0
Author: Batuhan Taskaya <[email protected]>
AuthorDate: Mon Sep 12 21:04:28 2022 +0300
Ensure the row count is preserved when coalescing over empty records (#3439)
* Ensure the row count is preserved when coalescing over empty records
* Explain the reasoning for not using optimized_plan in tests
---
datafusion/core/src/physical_plan/coalesce_batches.rs | 8 ++++++--
datafusion/core/tests/sql/mod.rs | 14 +++++++++++---
2 files changed, 17 insertions(+), 5 deletions(-)
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index a257ccf09..f2edacd90 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -33,7 +33,7 @@ use crate::execution::context::TaskContext;
use arrow::compute::kernels::concat::concat;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
-use arrow::record_batch::RecordBatch;
+use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use futures::stream::{Stream, StreamExt};
use log::trace;
@@ -291,7 +291,11 @@ pub fn concat_batches(
batches.len(),
row_count
);
- RecordBatch::try_new(schema.clone(), arrays)
+
+ let mut options = RecordBatchOptions::default();
+ options.row_count = Some(row_count);
+
+ RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
}
#[cfg(test)]
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index c16386c5d..ff419f3f2 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -770,14 +770,22 @@ async fn execute_to_batches(ctx: &SessionContext, sql:
&str) -> Vec<RecordBatch>
.unwrap();
let logical_schema = plan.schema();
+ // We are not really interested in the direct output of
optimized_logical_plan
+ // since the physical plan construction already optimizes the given
logical plan
+ // and we want to avoid double-optimization as a consequence. So we just
construct
+ // it here to make sure that it doesn't fail at this step and get the
optimized
+ // schema (to assert later that the logical and optimized schemas are the
same).
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
- let plan = ctx
+ let optimized_logical_plan = ctx
.optimize(&plan)
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();
- let optimized_logical_schema = plan.schema();
+ let optimized_logical_schema = optimized_logical_plan.schema();
- let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
+ let msg = format!(
+ "Creating physical plan for '{}': {:?}",
+ sql, optimized_logical_plan
+ );
let plan = ctx
.create_physical_plan(&plan)
.await