alamb commented on a change in pull request #638:
URL: https://github.com/apache/arrow-datafusion/pull/638#discussion_r660878629
##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -138,21 +140,56 @@ pub(crate) struct NthValueEvaluator {
}
impl PartitionEvaluator for NthValueEvaluator {
- fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
- let value = &self.values[0];
+ fn include_rank(&self) -> bool {
+ true
+ }
+
+ fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>
{
+ unreachable!("first, last, and nth_value evaluation must be called
with evaluate_partition_with_rank")
+ }
+
+ fn evaluate_partition_with_rank(
+ &self,
+ partition: Range<usize>,
+ ranks_in_partition: &[Range<usize>],
+ ) -> Result<ArrayRef> {
+ let arr = &self.values[0];
let num_rows = partition.end - partition.start;
- let value = value.slice(partition.start, num_rows);
- let index: usize = match self.kind {
- NthValueKind::First => 0,
- NthValueKind::Last => (num_rows as usize) - 1,
- NthValueKind::Nth(n) => (n as usize) - 1,
- };
- Ok(if index >= num_rows {
- new_null_array(value.data_type(), num_rows)
- } else {
- let value = ScalarValue::try_from_array(&value, index)?;
- value.to_array_of_size(num_rows)
- })
+ match self.kind {
+ NthValueKind::First => {
+ let value = ScalarValue::try_from_array(arr, partition.start)?;
+ Ok(value.to_array_of_size(num_rows))
+ }
+ NthValueKind::Last => {
+ // because the default window frame is between unbounded
preceding and current
+ // row with peer evaluation, hence the last rows expands until
the end of the peers
+ let values = ranks_in_partition
+ .iter()
+ .map(|range| {
+ let len = range.end - range.start;
+ let value = ScalarValue::try_from_array(arr, range.end
- 1)?;
+ Ok(iter::repeat(value).take(len))
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten();
+ ScalarValue::iter_to_array(values)
+ }
+ NthValueKind::Nth(n) => {
+ let index = (n as usize) - 1;
+ if index >= num_rows {
+ Ok(new_null_array(arr.data_type(), num_rows))
+ } else {
+ let value =
+ ScalarValue::try_from_array(arr, partition.start +
index)?;
+ let arr = value.to_array_of_size(num_rows);
+ // because the default window frame is between unbounded
preceding and current
+ // row, hence the shift because for values with indices <
index they should be
+ // null. This changes when window frames other than
default is implemented
+ shift(arr.as_ref(), index as
i64).map_err(DataFusionError::ArrowError)
Review comment:
👍
##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -138,21 +140,56 @@ pub(crate) struct NthValueEvaluator {
}
impl PartitionEvaluator for NthValueEvaluator {
- fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
- let value = &self.values[0];
+ fn include_rank(&self) -> bool {
+ true
+ }
+
+ fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>
{
+ unreachable!("first, last, and nth_value evaluation must be called
with evaluate_partition_with_rank")
+ }
+
+ fn evaluate_partition_with_rank(
+ &self,
+ partition: Range<usize>,
+ ranks_in_partition: &[Range<usize>],
+ ) -> Result<ArrayRef> {
+ let arr = &self.values[0];
let num_rows = partition.end - partition.start;
- let value = value.slice(partition.start, num_rows);
- let index: usize = match self.kind {
- NthValueKind::First => 0,
- NthValueKind::Last => (num_rows as usize) - 1,
- NthValueKind::Nth(n) => (n as usize) - 1,
- };
- Ok(if index >= num_rows {
- new_null_array(value.data_type(), num_rows)
- } else {
- let value = ScalarValue::try_from_array(&value, index)?;
- value.to_array_of_size(num_rows)
- })
+ match self.kind {
+ NthValueKind::First => {
+ let value = ScalarValue::try_from_array(arr, partition.start)?;
+ Ok(value.to_array_of_size(num_rows))
+ }
+ NthValueKind::Last => {
+ // because the default window frame is between unbounded
preceding and current
+ // row with peer evaluation, hence the last rows expands until
the end of the peers
+ let values = ranks_in_partition
+ .iter()
+ .map(|range| {
+ let len = range.end - range.start;
+ let value = ScalarValue::try_from_array(arr, range.end
- 1)?;
Review comment:
this is very cool
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]