This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 741df36 Remove DataFusionError::into_arrow_external_error (#1645)
741df36 is described below
commit 741df36fa0e401ec61d5441590728bc653ae0924
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jan 24 07:06:05 2022 -0500
Remove DataFusionError::into_arrow_external_error (#1645)
---
datafusion/src/error.rs | 9 ------
datafusion/src/physical_plan/cross_join.rs | 3 +-
datafusion/src/physical_plan/filter.rs | 4 +--
datafusion/src/physical_plan/hash_aggregate.rs | 34 +++++++---------------
datafusion/src/physical_plan/projection.rs | 11 ++++---
datafusion/src/physical_plan/repartition.rs | 4 +--
datafusion/src/physical_plan/sorts/sort.rs | 3 +-
.../src/physical_plan/windows/window_agg_exec.rs | 7 ++---
8 files changed, 24 insertions(+), 51 deletions(-)
diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs
index b7a8f45..248f243 100644
--- a/datafusion/src/error.rs
+++ b/datafusion/src/error.rs
@@ -72,15 +72,6 @@ pub enum DataFusionError {
External(GenericError),
}
-impl DataFusionError {
- /// Wraps this [DataFusionError] as an [arrow::error::ArrowError].
- ///
- /// TODO this can be removed in favor if the conversion below
- pub fn into_arrow_external_error(self) -> ArrowError {
- ArrowError::from_external_error(Box::new(self))
- }
-}
-
impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
diff --git a/datafusion/src/physical_plan/cross_join.rs
b/datafusion/src/physical_plan/cross_join.rs
index 087507e..48301f0 100644
--- a/datafusion/src/physical_plan/cross_join.rs
+++ b/datafusion/src/physical_plan/cross_join.rs
@@ -331,8 +331,7 @@ fn build_batch(
let scalar = ScalarValue::try_from_array(arr, left_index)?;
Ok(scalar.to_array_of_size(batch.num_rows()))
})
- .collect::<Result<Vec<_>>>()
- .map_err(|x| x.into_arrow_external_error())?;
+ .collect::<Result<Vec<_>>>()?;
RecordBatch::try_new(
Arc::new(schema.clone()),
diff --git a/datafusion/src/physical_plan/filter.rs
b/datafusion/src/physical_plan/filter.rs
index a071d0e..a48d112 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -176,7 +176,7 @@ fn batch_filter(
predicate
.evaluate(batch)
.map(|v| v.into_array(batch.num_rows()))
- .map_err(DataFusionError::into_arrow_external_error)
+ .map_err(DataFusionError::into)
.and_then(|array| {
array
.as_any()
@@ -185,7 +185,7 @@ fn batch_filter(
DataFusionError::Internal(
"Filter predicate evaluated to non-boolean
value".to_string(),
)
- .into_arrow_external_error()
+ .into()
})
// apply filter array to record batch
.and_then(|filter_array| filter_record_batch(batch,
filter_array))
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs
b/datafusion/src/physical_plan/hash_aggregate.rs
index 8074ae3..2b1a59e 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -403,8 +403,7 @@ fn group_aggregate_batch(
}
// 1.2 Need to create new entry
None => {
- let accumulator_set = create_accumulators(aggr_expr)
- .map_err(DataFusionError::into_arrow_external_error)?;
+ let accumulator_set = create_accumulators(aggr_expr)?;
// Copy group values out of arrays into `ScalarValue`s
let group_by_values = group_values
@@ -516,8 +515,7 @@ async fn compute_grouped_hash_aggregate(
// Assume create_schema() always put group columns in front of aggr
columns, we set
// col_idx_base to group expression count.
let aggregate_expressions =
- aggregate_expressions(&aggr_expr, &mode, group_expr.len())
- .map_err(DataFusionError::into_arrow_external_error)?;
+ aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
let random_state = RandomState::new();
@@ -535,8 +533,7 @@ async fn compute_grouped_hash_aggregate(
batch,
accumulators,
&aggregate_expressions,
- )
- .map_err(DataFusionError::into_arrow_external_error)?;
+ )?;
timer.done();
}
@@ -754,10 +751,8 @@ async fn compute_hash_aggregate(
elapsed_compute: metrics::Time,
) -> ArrowResult<RecordBatch> {
let timer = elapsed_compute.timer();
- let mut accumulators = create_accumulators(&aggr_expr)
- .map_err(DataFusionError::into_arrow_external_error)?;
- let expressions = aggregate_expressions(&aggr_expr, &mode, 0)
- .map_err(DataFusionError::into_arrow_external_error)?;
+ let mut accumulators = create_accumulators(&aggr_expr)?;
+ let expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
let expressions = Arc::new(expressions);
timer.done();
@@ -766,16 +761,14 @@ async fn compute_hash_aggregate(
while let Some(batch) = input.next().await {
let batch = batch?;
let timer = elapsed_compute.timer();
- aggregate_batch(&mode, &batch, &mut accumulators, &expressions)
- .map_err(DataFusionError::into_arrow_external_error)?;
+ aggregate_batch(&mode, &batch, &mut accumulators, &expressions)?;
timer.done();
}
// 2. convert values to a record batch
let timer = elapsed_compute.timer();
let batch = finalize_aggregation(&accumulators, &mode)
- .map(|columns| RecordBatch::try_new(schema.clone(), columns))
- .map_err(DataFusionError::into_arrow_external_error)?;
+ .map(|columns| RecordBatch::try_new(schema.clone(), columns))?;
timer.done();
batch
}
@@ -904,9 +897,7 @@ fn create_batch_from_map(
match mode {
AggregateMode::Partial => {
for acc in accs.iter() {
- let state = acc
- .state()
- .map_err(DataFusionError::into_arrow_external_error)?;
+ let state = acc.state()?;
acc_data_types.push(state.len());
}
}
@@ -924,8 +915,7 @@ fn create_batch_from_map(
.map(|group_state| group_state.group_by_values[i].clone()),
)
})
- .collect::<Result<Vec<_>>>()
- .map_err(|x| x.into_arrow_external_error())?;
+ .collect::<Result<Vec<_>>>()?;
// add state / evaluated arrays
for (x, &state_len) in acc_data_types.iter().enumerate() {
@@ -937,8 +927,7 @@ fn create_batch_from_map(
let x =
group_state.accumulator_set[x].state().unwrap();
x[y].clone()
}),
- )
- .map_err(DataFusionError::into_arrow_external_error)?;
+ )?;
columns.push(res);
}
@@ -947,8 +936,7 @@ fn create_batch_from_map(
accumulators.group_states.iter().map(|group_state| {
group_state.accumulator_set[x].evaluate().unwrap()
}),
- )
- .map_err(DataFusionError::into_arrow_external_error)?;
+ )?;
columns.push(res);
}
}
diff --git a/datafusion/src/physical_plan/projection.rs
b/datafusion/src/physical_plan/projection.rs
index 6d9d587..4d0cc61 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -236,15 +236,14 @@ impl ProjectionStream {
fn batch_project(&self, batch: &RecordBatch) -> ArrowResult<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
- self.expr
+ let arrays = self
+ .expr
.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()
- .map_or_else(
- |e| Err(DataFusionError::into_arrow_external_error(e)),
- |arrays| RecordBatch::try_new(self.schema.clone(), arrays),
- )
+ .collect::<Result<Vec<_>>>()?;
+
+ RecordBatch::try_new(self.schema.clone(), arrays)
}
}
diff --git a/datafusion/src/physical_plan/repartition.rs
b/datafusion/src/physical_plan/repartition.rs
index d69ecbd..7460754 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -416,7 +416,7 @@ impl RepartitionExec {
Err(e) => {
for (_, tx) in txs {
let err = DataFusionError::Execution(format!("Join Error:
{}", e));
- let err = Err(err.into_arrow_external_error());
+ let err = Err(err.into());
tx.send(Some(err)).ok();
}
}
@@ -425,7 +425,7 @@ impl RepartitionExec {
for (_, tx) in txs {
// wrap it because need to send error to all output
partitions
let err = DataFusionError::Execution(e.to_string());
- let err = Err(err.into_arrow_external_error());
+ let err = Err(err.into());
tx.send(Some(err)).ok();
}
}
diff --git a/datafusion/src/physical_plan/sorts/sort.rs
b/datafusion/src/physical_plan/sorts/sort.rs
index 5ea5a72..e091464 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -634,8 +634,7 @@ fn sort_batch(
&expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
- .collect::<Result<Vec<SortColumn>>>()
- .map_err(DataFusionError::into_arrow_external_error)?,
+ .collect::<Result<Vec<SortColumn>>>()?,
None,
)?;
diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs
b/datafusion/src/physical_plan/windows/window_agg_exec.rs
index b86ac1b..491e0eb 100644
--- a/datafusion/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs
@@ -273,9 +273,7 @@ impl WindowAggStream {
elapsed_compute: crate::physical_plan::metrics::Time,
) -> ArrowResult<RecordBatch> {
let input_schema = input.schema();
- let batches = common::collect(input)
- .await
- .map_err(DataFusionError::into_arrow_external_error)?;
+ let batches = common::collect(input).await?;
// record compute time on drop
let _timer = elapsed_compute.timer();
@@ -283,8 +281,7 @@ impl WindowAggStream {
let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
// calculate window cols
- let mut columns = compute_window_aggregates(window_expr, &batch)
- .map_err(DataFusionError::into_arrow_external_error)?;
+ let mut columns = compute_window_aggregates(window_expr, &batch)?;
// combine with the original cols
// note the setup of window aggregates is that they newly
calculated window
// expressions are always prepended to the columns