tustvold commented on code in PR #5101:
URL: https://github.com/apache/arrow-datafusion/pull/5101#discussion_r1089817867


##########
datafusion/core/src/physical_plan/joins/nested_loop_join.rs:
##########
@@ -400,12 +399,9 @@ impl NestedLoopJoinStream {
                     let mut left_indices_builder = UInt64Builder::new();
                     let mut right_indices_builder = UInt32Builder::new();
                     let left_right_indices = match indices_result {
-                        Err(_) => {
-                            // TODO why the type of result stream is 
`Result<T, ArrowError>`, and not the `DataFusionError`

Review Comment:
   :laughing: 



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -239,20 +238,21 @@ struct FilterExecStream {
 fn batch_filter(
     batch: &RecordBatch,
     predicate: &Arc<dyn PhysicalExpr>,
-) -> ArrowResult<RecordBatch> {
+) -> Result<RecordBatch> {
     predicate
         .evaluate(batch)
         .map(|v| v.into_array(batch.num_rows()))
-        .map_err(DataFusionError::into)
         .and_then(|array| {
             Ok(as_boolean_array(&array)?)
                 // apply filter array to record batch
-                .and_then(|filter_array| filter_record_batch(batch, 
filter_array))
+                .and_then(|filter_array| {
+                    filter_record_batch(batch, 
filter_array).map_err(Into::into)

Review Comment:
   ```suggestion
                       Ok(filter_record_batch(batch, filter_array)?)
   ```



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -348,28 +348,26 @@ impl WindowAggStream {
         // Calculate window cols
         for partition_point in partition_points {
             let length = partition_point.end - partition_point.start;
-            partition_results.push(
-                compute_window_aggregates(
-                    &self.window_expr,
-                    &batch.slice(partition_point.start, length),
-                )
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?,
-            )
+            partition_results.push(compute_window_aggregates(
+                &self.window_expr,
+                &batch.slice(partition_point.start, length),
+            )?)
         }
         let columns = transpose(partition_results)
             .iter()
             .map(|elems| concat(&elems.iter().map(|x| 
x.as_ref()).collect::<Vec<_>>()))
             .collect::<Vec<_>>()
             .into_iter()
-            .collect::<ArrowResult<Vec<ArrayRef>>>()?;
+            .collect::<Result<Vec<ArrayRef>, ArrowError>>()
+            .map_err(Into::<DataFusionError>::into)?;
 
         // combine with the original cols
         // note the setup of window aggregates is that they newly calculated 
window
         // expression results are always appended to the columns
         let mut batch_columns = batch.columns().to_vec();
         // calculate window cols
         batch_columns.extend_from_slice(&columns);
-        RecordBatch::try_new(self.schema.clone(), batch_columns)
+        RecordBatch::try_new(self.schema.clone(), 
batch_columns).map_err(Into::into)

Review Comment:
   ```suggestion
           Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?)
   ```



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -1045,7 +1044,8 @@ impl SMJStream {
                         .columns()
                         .iter()
                         .map(|column| take(column, &buffered_indices, None))
-                        .collect::<ArrowResult<Vec<_>>>()?
+                        .collect::<Result<Vec<_>, ArrowError>>()
+                        .map_err(Into::<DataFusionError>::into)?

Review Comment:
   ```suggestion
                           .collect::<Result<Vec<_>, ArrowError>>()?
   ```



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -1032,7 +1030,8 @@ impl SMJStream {
                 .columns()
                 .iter()
                 .map(|column| take(column, &streamed_indices, None))
-                .collect::<ArrowResult<Vec<_>>>()?;
+                .collect::<Result<Vec<_>, ArrowError>>()
+                .map_err(Into::<DataFusionError>::into)?;

Review Comment:
   ```suggestion
                   .collect::<Result<Vec<_>, ArrowError>>()?;
   ```



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -260,7 +258,11 @@ impl<F: FileOpener> FileStream<F> {
                         
self.file_stream_metrics.time_scanning_until_data.stop();
                         self.file_stream_metrics.time_scanning_total.stop();
                         let result = result
-                            .and_then(|b| self.pc_projector.project(b, 
partition_values))
+                            .and_then(|b| {
+                                self.pc_projector
+                                    .project(b, partition_values)
+                                    .map_err(|e| 
ArrowError::ExternalError(e.into()))

Review Comment:
   Is this needed, `project` already returns `ArrowError` AFAICT?



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -239,20 +238,21 @@ struct FilterExecStream {
 fn batch_filter(
     batch: &RecordBatch,
     predicate: &Arc<dyn PhysicalExpr>,
-) -> ArrowResult<RecordBatch> {
+) -> Result<RecordBatch> {
     predicate
         .evaluate(batch)
         .map(|v| v.into_array(batch.num_rows()))
-        .map_err(DataFusionError::into)
         .and_then(|array| {
             Ok(as_boolean_array(&array)?)
                 // apply filter array to record batch
-                .and_then(|filter_array| filter_record_batch(batch, 
filter_array))
+                .and_then(|filter_array| {

Review Comment:
   I think this and_then is redundant as it is called on an `Ok()`



##########
datafusion/core/src/physical_plan/joins/utils.rs:
##########
@@ -821,7 +820,7 @@ pub(crate) fn build_batch_from_indices(
         };
         columns.push(array);
     }
-    RecordBatch::try_new(Arc::new(schema.clone()), columns)
+    RecordBatch::try_new(Arc::new(schema.clone()), columns).map_err(Into::into)

Review Comment:
   ```suggestion
       Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
   ```



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -843,8 +839,9 @@ fn sort_batch(
                     }),
                 )
             })
-            .collect::<ArrowResult<Vec<ArrayRef>>>()?,
-    )?;
+            .collect::<Result<Vec<ArrayRef>, ArrowError>>()?,
+    )
+    .map_err(Into::<DataFusionError>::into)?;

Review Comment:
   ```suggestion
       )?;
   ```



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -462,7 +461,7 @@ impl SortedPartitionByBoundedWindowStream {
         if let Some(columns_to_show) = columns_to_show {
             let n_generated = columns_to_show[0].len();
             self.prune_state(n_generated)?;
-            RecordBatch::try_new(schema, columns_to_show)
+            RecordBatch::try_new(schema, columns_to_show).map_err(Into::into)

Review Comment:
   ```suggestion
               Ok(RecordBatch::try_new(schema, columns_to_show)?)
   ```



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -348,28 +348,26 @@ impl WindowAggStream {
         // Calculate window cols
         for partition_point in partition_points {
             let length = partition_point.end - partition_point.start;
-            partition_results.push(
-                compute_window_aggregates(
-                    &self.window_expr,
-                    &batch.slice(partition_point.start, length),
-                )
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?,
-            )
+            partition_results.push(compute_window_aggregates(
+                &self.window_expr,
+                &batch.slice(partition_point.start, length),
+            )?)
         }
         let columns = transpose(partition_results)
             .iter()
             .map(|elems| concat(&elems.iter().map(|x| 
x.as_ref()).collect::<Vec<_>>()))
             .collect::<Vec<_>>()
             .into_iter()
-            .collect::<ArrowResult<Vec<ArrayRef>>>()?;
+            .collect::<Result<Vec<ArrayRef>, ArrowError>>()
+            .map_err(Into::<DataFusionError>::into)?;

Review Comment:
   ```suggestion
               .collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to