alamb commented on code in PR #7401:
URL: https://github.com/apache/arrow-datafusion/pull/7401#discussion_r1324709045


##########
datafusion/core/src/physical_plan/row_converter.rs:
##########
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`CardinalityAwareRowConverter`] for converting data to
+//! [`arrow::row`] format.
+
+use arrow::datatypes::DataType;
+use arrow::row::{Row, RowConverter};
+use arrow::row::{Rows, SortField};
+use arrow_array::cast::AsArray;
+use arrow_array::*;
+use datafusion_common::{internal_err, DataFusionError, Result};
+
+/// The threshold of the number of values at which to consider a
+/// [`DictionaryArray`] "high" cardinality.
+///
+/// Since [`RowConverter`] blindly generates a mapping for all values,
+/// regardless of if they appear in the keys, this value is compared
+/// to the length of values.
+///
+/// The assumption is that the number of potential distinct key values
+/// (aka the length of the values array) is a more robust predictor of
+/// being "high" cardinality than the actual number of keys used. The
+/// intuition for this is that other values in the dictionary could be
+/// used in subsequent batches.
+///
+/// While an argument can made for doing something more sophisticated,
+/// this would likely only really make sense if the dictionary
+/// interner itself followed a similar approach, which it did not at
+/// the time of this writing.
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+/// Wrapper around an [`arrow::row`] [`RowConverter`] that disables
+/// dictionary preservation for high cardinality columns, based on the
+/// observed cardinalities in the first columns converted.
+///
+/// ## Background
+///
+/// By default, the [`RowConverter`] interns (and keeps a copy of) all
+/// values from [`DictionaryArray`]s. For low cardinality columns
+/// (with few distinct values) this approach is both faster and more
+/// memory efficient. However for high cardinality coumns it is slower
+/// and requires more memory. In certain degenerate cases, such as
+/// columns of nearly unique values, the `RowConverter` will keep a
+/// copy of the entire column.
+///
+/// See <https://github.com/apache/arrow-datafusion/issues/7200> for
+/// more details
+#[derive(Debug)]
+pub enum CardinalityAwareRowConverter {
+    /// Converter is newly initialized, and hasn't yet seen data
+    New {
+        /// Defines the Row conversion
+        fields: Vec<SortField>,
+    },
+    /// Converter has seen data and can convert [`Array`]s to/from
+    /// [`Rows`]
+    Converting {
+        /// Underlying converter
+        converter: RowConverter,
+        /// if preserve_dictionaries is disabled, the output type can be
+        /// different than input.
+        output_types: Vec<DataType>,
+    },
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self> {
+        Ok(Self::New { fields })
+    }
+
+    /// Returns the memory size of the underlying [`RowConverter`] if
+    /// any.
+    pub fn size(&self) -> usize {
+        match self {
+            Self::New { .. } => 0,
+            Self::Converting { converter, .. } => converter.size(),
+        }
+    }
+
+    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> 
Result<Rows> {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting { converter, .. } => {
+                Ok(converter.empty_rows(row_capacity, data_capacity))
+            }
+        }
+    }
+
+    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>>
+    where
+        I: IntoIterator<Item = Row<'a>>,
+    {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting {
+                converter,
+                output_types,
+            } => {
+                // Cast output type if needed
+                let output = converter
+                    .convert_rows(rows)?
+                    .into_iter()
+                    .zip(output_types.iter())
+                    .map(|(arr, output_type)| {
+                        if arr.data_type() != output_type {
+                            Ok(arrow::compute::cast(&arr, output_type)?)
+                        } else {
+                            Ok(arr)
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                Ok(output)
+            }
+        }
+    }
+
+    /// Calls [`RowConverter::convert_columns`] after first
+    /// initializing the converter based on cardinalities
+    pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows> {
+        Ok(self.converter_mut(columns)?.convert_columns(columns)?)
+    }
+
+    /// Return a mutable reference to the inner converter, creating it if 
needed
+    fn converter_mut(&mut self, columns: &[ArrayRef]) -> Result<&mut 
RowConverter> {
+        if let Self::New { fields } = self {
+            // TODO clean up the code

Review Comment:
   I think it is a left over -- I will remove it



##########
datafusion/core/src/physical_plan/row_converter.rs:
##########
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`CardinalityAwareRowConverter`] for converting data to
+//! [`arrow::row`] format.
+
+use arrow::datatypes::DataType;
+use arrow::row::{Row, RowConverter};
+use arrow::row::{Rows, SortField};
+use arrow_array::cast::AsArray;
+use arrow_array::*;
+use datafusion_common::{internal_err, DataFusionError, Result};
+
+/// The threshold of the number of values at which to consider a
+/// [`DictionaryArray`] "high" cardinality.
+///
+/// Since [`RowConverter`] blindly generates a mapping for all values,
+/// regardless of if they appear in the keys, this value is compared
+/// to the length of values.
+///
+/// The assumption is that the number of potential distinct key values
+/// (aka the length of the values array) is a more robust predictor of
+/// being "high" cardinality than the actual number of keys used. The
+/// intuition for this is that other values in the dictionary could be
+/// used in subsequent batches.
+///
+/// While an argument can made for doing something more sophisticated,
+/// this would likely only really make sense if the dictionary
+/// interner itself followed a similar approach, which it did not at
+/// the time of this writing.
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+/// Wrapper around an [`arrow::row`] [`RowConverter`] that disables
+/// dictionary preservation for high cardinality columns, based on the
+/// observed cardinalities in the first columns converted.
+///
+/// ## Background
+///
+/// By default, the [`RowConverter`] interns (and keeps a copy of) all
+/// values from [`DictionaryArray`]s. For low cardinality columns
+/// (with few distinct values) this approach is both faster and more
+/// memory efficient. However for high cardinality coumns it is slower
+/// and requires more memory. In certain degenerate cases, such as
+/// columns of nearly unique values, the `RowConverter` will keep a
+/// copy of the entire column.
+///
+/// See <https://github.com/apache/arrow-datafusion/issues/7200> for
+/// more details
+#[derive(Debug)]
+pub enum CardinalityAwareRowConverter {
+    /// Converter is newly initialized, and hasn't yet seen data
+    New {
+        /// Defines the Row conversion
+        fields: Vec<SortField>,
+    },
+    /// Converter has seen data and can convert [`Array`]s to/from
+    /// [`Rows`]
+    Converting {
+        /// Underlying converter
+        converter: RowConverter,
+        /// if preserve_dictionaries is disabled, the output type can be
+        /// different than input.
+        output_types: Vec<DataType>,
+    },
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self> {
+        Ok(Self::New { fields })
+    }
+
+    /// Returns the memory size of the underlying [`RowConverter`] if
+    /// any.
+    pub fn size(&self) -> usize {
+        match self {
+            Self::New { .. } => 0,
+            Self::Converting { converter, .. } => converter.size(),
+        }
+    }
+
+    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> 
Result<Rows> {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting { converter, .. } => {
+                Ok(converter.empty_rows(row_capacity, data_capacity))
+            }
+        }
+    }
+
+    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>>
+    where
+        I: IntoIterator<Item = Row<'a>>,
+    {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting {
+                converter,
+                output_types,
+            } => {
+                // Cast output type if needed
+                let output = converter
+                    .convert_rows(rows)?
+                    .into_iter()
+                    .zip(output_types.iter())
+                    .map(|(arr, output_type)| {
+                        if arr.data_type() != output_type {
+                            Ok(arrow::compute::cast(&arr, output_type)?)
+                        } else {
+                            Ok(arr)
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                Ok(output)
+            }
+        }
+    }
+
+    /// Calls [`RowConverter::convert_columns`] after first
+    /// initializing the converter based on cardinalities
+    pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows> {
+        Ok(self.converter_mut(columns)?.convert_columns(columns)?)
+    }
+
+    /// Return a mutable reference to the inner converter, creating it if 
needed
+    fn converter_mut(&mut self, columns: &[ArrayRef]) -> Result<&mut 
RowConverter> {
+        if let Self::New { fields } = self {
+            // TODO clean up the code
+            let mut updated_fields = fields.clone();
+            let mut output_types = vec![];
+            for (i, col) in columns.iter().enumerate() {
+                output_types.push(col.data_type().clone());
+                if let DataType::Dictionary(_, _) = col.data_type() {
+                    // see comments on LOW_CARDINALITY_THRESHOLD for
+                    // the rationale of this calculation
+                    let cardinality = 
col.as_any_dictionary_opt().unwrap().values().len();

Review Comment:
   Yeah, it is a nice API



##########
datafusion/core/tests/memory_limit.rs:
##########
@@ -238,7 +238,7 @@ async fn sort_preserving_merge() {
 
 #[tokio::test]
 async fn sort_spill_reservation() {
-    let partition_size = batches_byte_size(&dict_batches());
+    let partition_size = 
batches_byte_size(&maybe_split_batches(dict_batches(), true));

Review Comment:
   it is needed to match the change in data (which is also split). I tried to 
make this clearer in 355ef7397. 



##########
datafusion/core/src/physical_plan/row_converter.rs:
##########
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`CardinalityAwareRowConverter`] for converting data to
+//! [`arrow::row`] format.
+
+use arrow::datatypes::DataType;
+use arrow::row::{Row, RowConverter};
+use arrow::row::{Rows, SortField};
+use arrow_array::cast::AsArray;
+use arrow_array::*;
+use datafusion_common::{internal_err, DataFusionError, Result};
+
+/// The threshold of the number of values at which to consider a
+/// [`DictionaryArray`] "high" cardinality.
+///
+/// Since [`RowConverter`] blindly generates a mapping for all values,
+/// regardless of if they appear in the keys, this value is compared
+/// to the length of values.
+///
+/// The assumption is that the number of potential distinct key values
+/// (aka the length of the values array) is a more robust predictor of
+/// being "high" cardinality than the actual number of keys used. The
+/// intuition for this is that other values in the dictionary could be
+/// used in subsequent batches.
+///
+/// While an argument can made for doing something more sophisticated,
+/// this would likely only really make sense if the dictionary
+/// interner itself followed a similar approach, which it did not at
+/// the time of this writing.
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+/// Wrapper around an [`arrow::row`] [`RowConverter`] that disables
+/// dictionary preservation for high cardinality columns, based on the
+/// observed cardinalities in the first columns converted.
+///
+/// ## Background
+///
+/// By default, the [`RowConverter`] interns (and keeps a copy of) all
+/// values from [`DictionaryArray`]s. For low cardinality columns
+/// (with few distinct values) this approach is both faster and more
+/// memory efficient. However for high cardinality coumns it is slower
+/// and requires more memory. In certain degenerate cases, such as
+/// columns of nearly unique values, the `RowConverter` will keep a
+/// copy of the entire column.
+///
+/// See <https://github.com/apache/arrow-datafusion/issues/7200> for
+/// more details
+#[derive(Debug)]
+pub enum CardinalityAwareRowConverter {
+    /// Converter is newly initialized, and hasn't yet seen data
+    New {
+        /// Defines the Row conversion
+        fields: Vec<SortField>,
+    },
+    /// Converter has seen data and can convert [`Array`]s to/from
+    /// [`Rows`]
+    Converting {
+        /// Underlying converter
+        converter: RowConverter,
+        /// if preserve_dictionaries is disabled, the output type can be
+        /// different than input.
+        output_types: Vec<DataType>,
+    },
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self> {
+        Ok(Self::New { fields })
+    }
+
+    /// Returns the memory size of the underlying [`RowConverter`] if
+    /// any.
+    pub fn size(&self) -> usize {
+        match self {
+            Self::New { .. } => 0,
+            Self::Converting { converter, .. } => converter.size(),
+        }
+    }
+
+    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> 
Result<Rows> {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting { converter, .. } => {
+                Ok(converter.empty_rows(row_capacity, data_capacity))
+            }
+        }
+    }
+
+    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>>
+    where
+        I: IntoIterator<Item = Row<'a>>,
+    {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting {
+                converter,
+                output_types,
+            } => {
+                // Cast output type if needed
+                let output = converter
+                    .convert_rows(rows)?
+                    .into_iter()
+                    .zip(output_types.iter())
+                    .map(|(arr, output_type)| {
+                        if arr.data_type() != output_type {
+                            Ok(arrow::compute::cast(&arr, output_type)?)
+                        } else {
+                            Ok(arr)
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                Ok(output)
+            }
+        }
+    }
+
+    /// Calls [`RowConverter::convert_columns`] after first
+    /// initializing the converter based on cardinalities
+    pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows> {
+        Ok(self.converter_mut(columns)?.convert_columns(columns)?)
+    }
+
+    /// Return a mutable reference to the inner converter, creating it if 
needed
+    fn converter_mut(&mut self, columns: &[ArrayRef]) -> Result<&mut 
RowConverter> {
+        if let Self::New { fields } = self {
+            // TODO clean up the code
+            let mut updated_fields = fields.clone();

Review Comment:
   Sadly it seems like `SortOptions` does not implement `Default` 😢 
   
   
   ```
      --> datafusion/core/src/physical_plan/row_converter.rs:150:53
       |
   150 |             let mut updated_fields = std::mem::take(&mut fields);
       |                                      -------------- ^^^^^^^^^^^ the 
trait `std::default::Default` is not implemented for `&mut 
std::vec::Vec<arrow::arrow_row::SortField>`
       |                                      |
       |                                      required by a bound introduced by 
this call
       |
   note: required by a bound in `std::mem::take`
   ```



##########
datafusion/core/src/physical_plan/row_converter.rs:
##########
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`CardinalityAwareRowConverter`] for converting data to
+//! [`arrow::row`] format.
+
+use arrow::datatypes::DataType;
+use arrow::row::{Row, RowConverter};
+use arrow::row::{Rows, SortField};
+use arrow_array::cast::AsArray;
+use arrow_array::*;
+use datafusion_common::{internal_err, DataFusionError, Result};
+
+/// The threshold of the number of values at which to consider a
+/// [`DictionaryArray`] "high" cardinality.
+///
+/// Since [`RowConverter`] blindly generates a mapping for all values,
+/// regardless of if they appear in the keys, this value is compared
+/// to the length of values.
+///
+/// The assumption is that the number of potential distinct key values
+/// (aka the length of the values array) is a more robust predictor of
+/// being "high" cardinality than the actual number of keys used. The
+/// intuition for this is that other values in the dictionary could be
+/// used in subsequent batches.
+///
+/// While an argument can made for doing something more sophisticated,
+/// this would likely only really make sense if the dictionary
+/// interner itself followed a similar approach, which it did not at
+/// the time of this writing.
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+/// Wrapper around an [`arrow::row`] [`RowConverter`] that disables
+/// dictionary preservation for high cardinality columns, based on the
+/// observed cardinalities in the first columns converted.
+///
+/// ## Background
+///
+/// By default, the [`RowConverter`] interns (and keeps a copy of) all
+/// values from [`DictionaryArray`]s. For low cardinality columns
+/// (with few distinct values) this approach is both faster and more
+/// memory efficient. However for high cardinality coumns it is slower
+/// and requires more memory. In certain degenerate cases, such as
+/// columns of nearly unique values, the `RowConverter` will keep a
+/// copy of the entire column.
+///
+/// See <https://github.com/apache/arrow-datafusion/issues/7200> for
+/// more details
+#[derive(Debug)]
+pub enum CardinalityAwareRowConverter {
+    /// Converter is newly initialized, and hasn't yet seen data
+    New {
+        /// Defines the Row conversion
+        fields: Vec<SortField>,
+    },
+    /// Converter has seen data and can convert [`Array`]s to/from
+    /// [`Rows`]
+    Converting {
+        /// Underlying converter
+        converter: RowConverter,
+        /// if preserve_dictionaries is disabled, the output type can be
+        /// different than input.
+        output_types: Vec<DataType>,
+    },
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self> {
+        Ok(Self::New { fields })
+    }
+
+    /// Returns the memory size of the underlying [`RowConverter`] if
+    /// any.
+    pub fn size(&self) -> usize {
+        match self {
+            Self::New { .. } => 0,
+            Self::Converting { converter, .. } => converter.size(),
+        }
+    }
+
+    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> 
Result<Rows> {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting { converter, .. } => {
+                Ok(converter.empty_rows(row_capacity, data_capacity))
+            }
+        }
+    }
+
+    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>>
+    where
+        I: IntoIterator<Item = Row<'a>>,
+    {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting {
+                converter,
+                output_types,
+            } => {
+                // Cast output type if needed

Review Comment:
   in b673c09b0



##########
datafusion/core/src/physical_plan/row_converter.rs:
##########
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`CardinalityAwareRowConverter`] for converting data to
+//! [`arrow::row`] format.
+
+use arrow::datatypes::DataType;
+use arrow::row::{Row, RowConverter};
+use arrow::row::{Rows, SortField};
+use arrow_array::cast::AsArray;
+use arrow_array::*;
+use datafusion_common::{internal_err, DataFusionError, Result};
+
+/// The threshold of the number of values at which to consider a
+/// [`DictionaryArray`] "high" cardinality.
+///
+/// Since [`RowConverter`] blindly generates a mapping for all values,
+/// regardless of if they appear in the keys, this value is compared
+/// to the length of values.
+///
+/// The assumption is that the number of potential distinct key values
+/// (aka the length of the values array) is a more robust predictor of
+/// being "high" cardinality than the actual number of keys used. The
+/// intuition for this is that other values in the dictionary could be
+/// used in subsequent batches.
+///
+/// While an argument can made for doing something more sophisticated,
+/// this would likely only really make sense if the dictionary
+/// interner itself followed a similar approach, which it did not at
+/// the time of this writing.
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+/// Wrapper around an [`arrow::row`] [`RowConverter`] that disables
+/// dictionary preservation for high cardinality columns, based on the
+/// observed cardinalities in the first columns converted.
+///
+/// ## Background
+///
+/// By default, the [`RowConverter`] interns (and keeps a copy of) all
+/// values from [`DictionaryArray`]s. For low cardinality columns
+/// (with few distinct values) this approach is both faster and more
+/// memory efficient. However for high cardinality coumns it is slower
+/// and requires more memory. In certain degenerate cases, such as
+/// columns of nearly unique values, the `RowConverter` will keep a
+/// copy of the entire column.
+///
+/// See <https://github.com/apache/arrow-datafusion/issues/7200> for
+/// more details
+#[derive(Debug)]
+pub enum CardinalityAwareRowConverter {
+    /// Converter is newly initialized, and hasn't yet seen data
+    New {
+        /// Defines the Row conversion
+        fields: Vec<SortField>,
+    },
+    /// Converter has seen data and can convert [`Array`]s to/from
+    /// [`Rows`]
+    Converting {
+        /// Underlying converter
+        converter: RowConverter,
+        /// if preserve_dictionaries is disabled, the output type can be
+        /// different than input.
+        output_types: Vec<DataType>,
+    },
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self> {
+        Ok(Self::New { fields })
+    }
+
+    /// Returns the memory size of the underlying [`RowConverter`] if
+    /// any.
+    pub fn size(&self) -> usize {
+        match self {
+            Self::New { .. } => 0,
+            Self::Converting { converter, .. } => converter.size(),
+        }
+    }
+
+    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> 
Result<Rows> {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting { converter, .. } => {
+                Ok(converter.empty_rows(row_capacity, data_capacity))
+            }
+        }
+    }
+
+    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>>
+    where
+        I: IntoIterator<Item = Row<'a>>,
+    {
+        match self {
+            Self::New { .. } => internal_err!(
+                "CardinalityAwareRowConverter has not converted any rows yet"
+            ),
+            Self::Converting {
+                converter,
+                output_types,
+            } => {
+                // Cast output type if needed
+                let output = converter
+                    .convert_rows(rows)?
+                    .into_iter()
+                    .zip(output_types.iter())
+                    .map(|(arr, output_type)| {
+                        if arr.data_type() != output_type {
+                            Ok(arrow::compute::cast(&arr, output_type)?)
+                        } else {
+                            Ok(arr)
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                Ok(output)
+            }
+        }
+    }
+
+    /// Calls [`RowConverter::convert_columns`] after first
+    /// initializing the converter based on cardinalities
+    pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows> {
+        Ok(self.converter_mut(columns)?.convert_columns(columns)?)
+    }
+
+    /// Return a mutable reference to the inner converter, creating it if 
needed
+    fn converter_mut(&mut self, columns: &[ArrayRef]) -> Result<&mut 
RowConverter> {
+        if let Self::New { fields } = self {
+            // TODO clean up the code
+            let mut updated_fields = fields.clone();

Review Comment:
   Sadly it seems like `SortOptions` does not implement `Default` 😢 
   
   
   ```
      --> datafusion/core/src/physical_plan/row_converter.rs:150:53
       |
   150 |             let mut updated_fields = std::mem::take(&mut fields);
       |                                      -------------- ^^^^^^^^^^^ the 
trait `std::default::Default` is not implemented for `&mut 
std::vec::Vec<arrow::arrow_row::SortField>`
       |                                      |
       |                                      required by a bound introduced by 
this call
       |
   note: required by a bound in `std::mem::take`
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to