This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new f603d34e3b Update workspace to use Rust 1.89 (#17100) f603d34e3b is described below commit f603d34e3b07b5e435accb77324110908dad3545 Author: Shruti Sharma <98698727+shruti2...@users.noreply.github.com> AuthorDate: Wed Aug 13 01:37:36 2025 +0530 Update workspace to use Rust 1.89 (#17100) Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion-examples/examples/parquet_index.rs | 2 +- datafusion/catalog/src/async.rs | 2 +- datafusion/catalog/src/cte_worktable.rs | 2 +- datafusion/catalog/src/default_table_source.rs | 2 +- datafusion/catalog/src/table.rs | 4 +- datafusion/catalog/src/view.rs | 2 +- datafusion/common/src/error.rs | 2 +- datafusion/common/src/utils/mod.rs | 2 +- datafusion/core/src/dataframe/mod.rs | 2 +- datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 2 +- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 18 ++- datafusion/core/tests/macro_hygiene/mod.rs | 1 + datafusion/core/tests/parquet/file_statistics.rs | 2 +- .../physical_optimizer/partition_statistics.rs | 7 +- .../src/avro_to_arrow/arrow_array_reader.rs | 2 +- .../datasource-avro/src/avro_to_arrow/reader.rs | 2 +- datafusion/datasource-parquet/src/opener.rs | 4 +- datafusion/datasource/src/file_format.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 2 +- datafusion/datasource/src/memory.rs | 2 +- datafusion/datasource/src/sink.rs | 2 +- datafusion/datasource/src/statistics.rs | 4 +- datafusion/expr/src/table_source.rs | 2 +- .../functions-aggregate/benches/array_agg.rs | 2 +- datafusion/functions-aggregate/benches/count.rs | 8 +- datafusion/functions-aggregate/benches/sum.rs | 2 +- datafusion/functions/src/datetime/common.rs | 4 +- datafusion/functions/src/datetime/to_local_time.rs | 2 +- datafusion/functions/src/macros.rs | 1 + datafusion/functions/src/math/log.rs | 2 +- datafusion/functions/src/unicode/lpad.rs | 36 +++--- datafusion/functions/src/utils.rs | 121 ++++++++++----------- datafusion/optimizer/src/analyzer/type_coercion.rs | 2 +- .../src/simplify_expressions/expr_simplifier.rs | 2 +- .../physical-expr-common/src/physical_expr.rs | 8 +- datafusion/physical-expr/src/intervals/utils.rs | 4 +- .../aggregates/group_values/multi_group_by/mod.rs | 15 ++- datafusion/physical-plan/src/coalesce/mod.rs | 2 +- datafusion/physical-plan/src/filter.rs | 4 +- .../proto/tests/cases/roundtrip_logical_plan.rs | 1 + datafusion/spark/src/function/utils.rs | 87 ++++++++------- datafusion/sql/src/unparser/utils.rs | 2 +- rust-toolchain.toml | 2 +- 43 files changed, 204 insertions(+), 175 deletions(-) diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index a9c0d2c4da..afc3b279f4 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -313,7 +313,7 @@ impl Display for ParquetMetadataIndex { "ParquetMetadataIndex(last_num_pruned: {})", self.last_num_pruned() )?; - let batches = pretty_format_batches(&[self.index.clone()]).unwrap(); + let batches = pretty_format_batches(std::slice::from_ref(&self.index)).unwrap(); write!(f, "{batches}",) } } diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index 5d7a51ad71..1c830c976d 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -737,7 +737,7 @@ mod tests { ] { let async_provider = MockAsyncCatalogProviderList::default(); let cached_provider = async_provider - .resolve(&[table_ref.clone()], &test_config()) + .resolve(std::slice::from_ref(table_ref), &test_config()) .await .unwrap(); diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index d72a30909c..d6b2a45311 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -71,7 +71,7 @@ impl TableProvider for CteWorkTable { self } - fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> { + fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> { None } diff --git a/datafusion/catalog/src/default_table_source.rs b/datafusion/catalog/src/default_table_source.rs index c61c7919ea..11963c06c8 100644 --- a/datafusion/catalog/src/default_table_source.rs +++ b/datafusion/catalog/src/default_table_source.rs @@ -76,7 +76,7 @@ impl TableSource for DefaultTableSource { self.table_provider.supports_filters_pushdown(filter) } - fn get_logical_plan(&self) -> Option<Cow<datafusion_expr::LogicalPlan>> { + fn get_logical_plan(&'_ self) -> Option<Cow<'_, datafusion_expr::LogicalPlan>> { self.table_provider.get_logical_plan() } diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 207abb9c66..ecfe63fa34 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -49,7 +49,7 @@ use datafusion_physical_plan::ExecutionPlan; /// [`CatalogProvider`]: super::CatalogProvider #[async_trait] pub trait TableProvider: Debug + Sync + Send { - /// Returns the table provider as [`Any`](std::any::Any) so that it can be + /// Returns the table provider as [`Any`] so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; @@ -75,7 +75,7 @@ pub trait TableProvider: Debug + Sync + Send { } /// Get the [`LogicalPlan`] of this table, if available. - fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> { + fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> { None } diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs index 8dfb79718c..3bb7214399 100644 --- a/datafusion/catalog/src/view.rs +++ b/datafusion/catalog/src/view.rs @@ -87,7 +87,7 @@ impl TableProvider for ViewTable { self } - fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> { + fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> { Some(Cow::Borrowed(&self.logical_plan)) } diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 88029ea474..d08d2394e4 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -523,7 +523,7 @@ impl DataFusionError { } } - pub fn message(&self) -> Cow<str> { + pub fn message(&self) -> Cow<'_, str> { match *self { DataFusionError::ArrowError(ref desc, ref backtrace) => { let backtrace = backtrace.clone().unwrap_or_else(|| "".to_owned()); diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 404f13b4df..3f776a44bc 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -260,7 +260,7 @@ pub fn evaluate_partition_ranges( /// the identifier by replacing it with two double quotes /// /// e.g. identifier `tab.le"name` becomes `"tab.le""name"` -pub fn quote_identifier(s: &str) -> Cow<str> { +pub fn quote_identifier(s: &str) -> Cow<'_, str> { if needs_quotes(s) { Cow::Owned(format!("\"{}\"", s.replace('"', "\"\""))) } else { diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index a19e6f5581..7a3739d36c 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2425,7 +2425,7 @@ impl TableProvider for DataFrameTableProvider { self } - fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> { + fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> { Some(Cow::Borrowed(&self.plan)) } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index bcf60eb2d7..c103daa885 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -320,7 +320,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str .unwrap(); let running_source = DataSourceExec::from_data_source( - MemorySourceConfig::try_new(&[input1.clone()], schema.clone(), None) + MemorySourceConfig::try_new(std::slice::from_ref(&input1), schema.clone(), None) .unwrap() .try_with_sort_information(vec![sort_keys.into()]) .unwrap(), diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 7250a263d8..0d24a7f0e2 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -477,12 +477,18 @@ impl JoinFuzzTestCase { fn left_right(&self) -> (Arc<DataSourceExec>, Arc<DataSourceExec>) { let schema1 = self.input1[0].schema(); let schema2 = self.input2[0].schema(); - let left = - MemorySourceConfig::try_new_exec(&[self.input1.clone()], schema1, None) - .unwrap(); - let right = - MemorySourceConfig::try_new_exec(&[self.input2.clone()], schema2, None) - .unwrap(); + let left = MemorySourceConfig::try_new_exec( + std::slice::from_ref(&self.input1), + schema1, + None, + ) + .unwrap(); + let right = MemorySourceConfig::try_new_exec( + std::slice::from_ref(&self.input2), + schema2, + None, + ) + .unwrap(); (left, right) } diff --git a/datafusion/core/tests/macro_hygiene/mod.rs b/datafusion/core/tests/macro_hygiene/mod.rs index e5396ce219..09fb38b72e 100644 --- a/datafusion/core/tests/macro_hygiene/mod.rs +++ b/datafusion/core/tests/macro_hygiene/mod.rs @@ -83,6 +83,7 @@ mod config_field { impl std::error::Error for E {} + #[allow(dead_code)] struct S; impl std::str::FromStr for S { diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 65c7e12eb0..64ee92eda2 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -72,7 +72,7 @@ async fn check_stats_precision_with_filter_pushdown() { // source operator after the appropriate optimizer pass. let filter_expr = Expr::gt(col("id"), lit(1)); let exec_with_filter = table - .scan(&state, None, &[filter_expr.clone()], None) + .scan(&state, None, std::slice::from_ref(&filter_expr), None) .await .unwrap(); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 4b39e37f94..bfc09340cc 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -746,8 +746,11 @@ mod test { for (i, partition_stream) in partitions.into_iter().enumerate() { let batches: Vec<RecordBatch> = partition_stream.try_collect().await?; let actual = plan.partition_statistics(Some(i))?; - let expected = - compute_record_batch_statistics(&[batches.clone()], &schema, None); + let expected = compute_record_batch_statistics( + std::slice::from_ref(&batches), + &schema, + None, + ); assert_eq!(actual, expected); all_batches.push(batches); } diff --git a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs index 36553b36bc..2753c44bf6 100644 --- a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs @@ -1046,7 +1046,7 @@ mod test { use std::fs::File; use std::sync::Arc; - fn build_reader(name: &str, batch_size: usize) -> Reader<File> { + fn build_reader(name: &'_ str, batch_size: usize) -> Reader<'_, File> { let testdata = datafusion_common::test_util::arrow_test_data(); let filename = format!("{testdata}/avro/{name}"); let builder = ReaderBuilder::new() diff --git a/datafusion/datasource-avro/src/avro_to_arrow/reader.rs b/datafusion/datasource-avro/src/avro_to_arrow/reader.rs index 7f5900605a..9a4d13fc19 100644 --- a/datafusion/datasource-avro/src/avro_to_arrow/reader.rs +++ b/datafusion/datasource-avro/src/avro_to_arrow/reader.rs @@ -195,7 +195,7 @@ mod tests { use arrow::datatypes::{DataType, Field}; use std::fs::File; - fn build_reader(name: &str, projection: Option<Vec<String>>) -> Reader<File> { + fn build_reader(name: &'_ str, projection: Option<Vec<String>>) -> Reader<'_, File> { let testdata = datafusion_common::test_util::arrow_test_data(); let filename = format!("{testdata}/avro/{name}"); let mut builder = ReaderBuilder::new().read_schema().with_batch_size(64); diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 62dc0fccc2..af4a9075a6 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -237,11 +237,11 @@ impl FileOpener for ParquetOpener { )?; } - if coerce_int96.is_some() { + if let Some(ref coerce) = coerce_int96 { if let Some(merged) = coerce_int96_to_resolution( reader_metadata.parquet_schema(), &physical_file_schema, - &(coerce_int96.unwrap()), + coerce, ) { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs index e0239ab36d..23f68636c1 100644 --- a/datafusion/datasource/src/file_format.rs +++ b/datafusion/datasource/src/file_format.rs @@ -48,7 +48,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html #[async_trait] pub trait FileFormat: Send + Sync + fmt::Debug { - /// Returns the table provider as [`Any`](std::any::Any) so that it can be + /// Returns the table provider as [`Any`] so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 95cc9e24b6..4d03c46cf5 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1982,7 +1982,7 @@ mod tests { ); let result = FileScanConfig::split_groups_by_statistics( &table_schema, - &[partitioned_files.clone()], + std::slice::from_ref(&partitioned_files), &sort_order, ); let results_by_name = result diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index f5eb354ea1..673c1b9dd4 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -650,7 +650,7 @@ impl RePartition { impl PartialOrd for RePartition { fn partial_cmp(&self, other: &Self) -> Option<Ordering> { - Some(self.row_count.cmp(&other.row_count)) + Some(self.cmp(other)) } } diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index b8c5b42bf7..13404bccac 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -46,7 +46,7 @@ use futures::StreamExt; /// output. #[async_trait] pub trait DataSink: DisplayAs + Debug + Send + Sync { - /// Returns the data sink as [`Any`](std::any::Any) so that it can be + /// Returns the data sink as [`Any`] so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 5099bfa072..0dd9bdb87c 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -57,12 +57,12 @@ impl MinMaxStatistics { /// Min value at index #[allow(unused)] - pub fn min(&self, idx: usize) -> Row { + pub fn min(&'_ self, idx: usize) -> Row<'_> { self.min_by_sort_order.row(idx) } /// Max value at index - pub fn max(&self, idx: usize) -> Row { + pub fn max(&'_ self, idx: usize) -> Row<'_> { self.max_by_sort_order.row(idx) } diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index 81fec4a1b0..d3b253c0e1 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -121,7 +121,7 @@ pub trait TableSource: Sync + Send { /// Get the Logical plan of this table provider, if available. /// /// For example, a view may have a logical plan, but a CSV file does not. - fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> { + fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> { None } diff --git a/datafusion/functions-aggregate/benches/array_agg.rs b/datafusion/functions-aggregate/benches/array_agg.rs index 6dadb12aba..96444b0184 100644 --- a/datafusion/functions-aggregate/benches/array_agg.rs +++ b/datafusion/functions-aggregate/benches/array_agg.rs @@ -45,7 +45,7 @@ fn merge_batch_bench(c: &mut Criterion, name: &str, values: ArrayRef) { black_box( ArrayAggAccumulator::try_new(&list_item_data_type, false) .unwrap() - .merge_batch(&[values.clone()]) + .merge_batch(std::slice::from_ref(&values)) .unwrap(), ) }) diff --git a/datafusion/functions-aggregate/benches/count.rs b/datafusion/functions-aggregate/benches/count.rs index 80cb65be2e..37c7fad4bd 100644 --- a/datafusion/functions-aggregate/benches/count.rs +++ b/datafusion/functions-aggregate/benches/count.rs @@ -82,7 +82,7 @@ fn convert_to_state_bench( b.iter(|| { black_box( accumulator - .convert_to_state(&[values.clone()], opt_filter) + .convert_to_state(std::slice::from_ref(&values), opt_filter) .unwrap(), ) }) @@ -125,7 +125,11 @@ fn count_benchmark(c: &mut Criterion) { c.bench_function("count low cardinality dict 20% nulls, no filter", |b| { b.iter(|| { #[allow(clippy::unit_arg)] - black_box(accumulator.update_batch(&[values.clone()]).unwrap()) + black_box( + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap(), + ) }) }); } diff --git a/datafusion/functions-aggregate/benches/sum.rs b/datafusion/functions-aggregate/benches/sum.rs index 4517db6b15..a1e9894fb8 100644 --- a/datafusion/functions-aggregate/benches/sum.rs +++ b/datafusion/functions-aggregate/benches/sum.rs @@ -56,7 +56,7 @@ fn convert_to_state_bench( b.iter(|| { black_box( accumulator - .convert_to_state(&[values.clone()], opt_filter) + .convert_to_state(std::slice::from_ref(&values), opt_filter) .unwrap(), ) }) diff --git a/datafusion/functions/src/datetime/common.rs b/datafusion/functions/src/datetime/common.rs index fd9f37d805..df7de0083d 100644 --- a/datafusion/functions/src/datetime/common.rs +++ b/datafusion/functions/src/datetime/common.rs @@ -412,8 +412,8 @@ where }?; let r = op(x, v); - if r.is_ok() { - val = Some(Ok(op2(r.unwrap()))); + if let Ok(inner) = r { + val = Some(Ok(op2(inner))); break; } else { val = Some(r); diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index b6d4404d6d..1f1c300f25 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -372,7 +372,7 @@ impl ScalarUDFImpl for ToLocalTimeFunc { ) -> Result<ColumnarValue> { let [time_value] = take_function_args(self.name(), args.args)?; - self.to_local_time(&[time_value.clone()]) + self.to_local_time(std::slice::from_ref(&time_value)) } fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> { diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 30ebf8654e..03fad2f25e 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -73,6 +73,7 @@ macro_rules! export_functions { #[macro_export] macro_rules! make_udf_function { ($UDF:ty, $NAME:ident) => { + #[allow(rustdoc::redundant_explicit_links)] #[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))] pub fn $NAME() -> std::sync::Arc<datafusion_expr::ScalarUDF> { // Singleton instance of the function diff --git a/datafusion/functions/src/math/log.rs b/datafusion/functions/src/math/log.rs index 186d0d3c47..efa9a4c89b 100644 --- a/datafusion/functions/src/math/log.rs +++ b/datafusion/functions/src/math/log.rs @@ -648,7 +648,7 @@ mod tests { // Test log(num) for order in orders.iter().cloned() { - let result = log.output_ordering(&[order.clone()]).unwrap(); + let result = log.output_ordering(std::slice::from_ref(&order)).unwrap(); assert_eq!(result, order.sort_properties); } diff --git a/datafusion/functions/src/unicode/lpad.rs b/datafusion/functions/src/unicode/lpad.rs index ea57dbd2be..3c149cc68a 100644 --- a/datafusion/functions/src/unicode/lpad.rs +++ b/datafusion/functions/src/unicode/lpad.rs @@ -204,11 +204,15 @@ where V2: StringArrayType<'a>, T: OffsetSizeTrait, { - let array = if fill_array.is_none() { + let array = if let Some(fill_array) = fill_array { let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new(); - for (string, length) in string_array.iter().zip(length_array.iter()) { - if let (Some(string), Some(length)) = (string, length) { + for ((string, length), fill) in string_array + .iter() + .zip(length_array.iter()) + .zip(fill_array.iter()) + { + if let (Some(string), Some(length), Some(fill)) = (string, length, fill) { if length > i32::MAX as i64 { return exec_err!("lpad requested length {length} too large"); } @@ -220,10 +224,17 @@ where } let graphemes = string.graphemes(true).collect::<Vec<&str>>(); + let fill_chars = fill.chars().collect::<Vec<char>>(); + if length < graphemes.len() { builder.append_value(graphemes[..length].concat()); + } else if fill_chars.is_empty() { + builder.append_value(string); } else { - builder.write_str(" ".repeat(length - graphemes.len()).as_str())?; + for l in 0..length - graphemes.len() { + let c = *fill_chars.get(l % fill_chars.len()).unwrap(); + builder.write_char(c)?; + } builder.write_str(string)?; builder.append_value(""); } @@ -236,12 +247,8 @@ where } else { let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new(); - for ((string, length), fill) in string_array - .iter() - .zip(length_array.iter()) - .zip(fill_array.unwrap().iter()) - { - if let (Some(string), Some(length), Some(fill)) = (string, length, fill) { + for (string, length) in string_array.iter().zip(length_array.iter()) { + if let (Some(string), Some(length)) = (string, length) { if length > i32::MAX as i64 { return exec_err!("lpad requested length {length} too large"); } @@ -253,17 +260,10 @@ where } let graphemes = string.graphemes(true).collect::<Vec<&str>>(); - let fill_chars = fill.chars().collect::<Vec<char>>(); - if length < graphemes.len() { builder.append_value(graphemes[..length].concat()); - } else if fill_chars.is_empty() { - builder.append_value(string); } else { - for l in 0..length - graphemes.len() { - let c = *fill_chars.get(l % fill_chars.len()).unwrap(); - builder.write_char(c)?; - } + builder.write_str(" ".repeat(length - graphemes.len()).as_str())?; builder.write_str(string)?; builder.append_value(""); } diff --git a/datafusion/functions/src/utils.rs b/datafusion/functions/src/utils.rs index 0e9ef8dacd..5294d071a4 100644 --- a/datafusion/functions/src/utils.rs +++ b/datafusion/functions/src/utils.rs @@ -130,18 +130,18 @@ pub mod test { /// $ARRAY_TYPE is the column type after function applied /// $CONFIG_OPTIONS config options to pass to function macro_rules! test_function { - ($FUNC:expr, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $EXPECTED_DATA_TYPE:expr, $ARRAY_TYPE:ident, $CONFIG_OPTIONS:expr) => { - let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED; - let func = $FUNC; - - let data_array = $ARGS.iter().map(|arg| arg.data_type()).collect::<Vec<_>>(); - let cardinality = $ARGS - .iter() - .fold(Option::<usize>::None, |acc, arg| match arg { - ColumnarValue::Scalar(_) => acc, - ColumnarValue::Array(a) => Some(a.len()), - }) - .unwrap_or(1); + ($FUNC:expr, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $EXPECTED_DATA_TYPE:expr, $ARRAY_TYPE:ident, $CONFIG_OPTIONS:expr) => { + let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED; + let func = $FUNC; + + let data_array = $ARGS.iter().map(|arg| arg.data_type()).collect::<Vec<_>>(); + let cardinality = $ARGS + .iter() + .fold(Option::<usize>::None, |acc, arg| match arg { + ColumnarValue::Scalar(_) => acc, + ColumnarValue::Array(a) => Some(a.len()), + }) + .unwrap_or(1); let scalar_arguments = $ARGS.iter().map(|arg| match arg { ColumnarValue::Scalar(scalar) => Some(scalar.clone()), @@ -156,71 +156,70 @@ pub mod test { let field_array = data_array.into_iter().zip(nullables).enumerate() .map(|(idx, (data_type, nullable))| arrow::datatypes::Field::new(format!("field_{idx}"), data_type, nullable)) - .map(std::sync::Arc::new) - .collect::<Vec<_>>(); + .map(std::sync::Arc::new) + .collect::<Vec<_>>(); - let return_field = func.return_field_from_args(datafusion_expr::ReturnFieldArgs { - arg_fields: &field_array, - scalar_arguments: &scalar_arguments_refs, - }); + let return_field = func.return_field_from_args(datafusion_expr::ReturnFieldArgs { + arg_fields: &field_array, + scalar_arguments: &scalar_arguments_refs, + }); let arg_fields = $ARGS.iter() - .enumerate() + .enumerate() .map(|(idx, arg)| arrow::datatypes::Field::new(format!("f_{idx}"), arg.data_type(), true).into()) - .collect::<Vec<_>>(); + .collect::<Vec<_>>(); - match expected { - Ok(expected) => { - assert_eq!(return_field.is_ok(), true); - let return_field = return_field.unwrap(); - let return_type = return_field.data_type(); - assert_eq!(return_type, &$EXPECTED_DATA_TYPE); + match expected { + Ok(expected) => { + assert_eq!(return_field.is_ok(), true); + let return_field = return_field.unwrap(); + let return_type = return_field.data_type(); + assert_eq!(return_type, &$EXPECTED_DATA_TYPE); let result = func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{ - args: $ARGS, - arg_fields, - number_rows: cardinality, - return_field, + args: $ARGS, + arg_fields, + number_rows: cardinality, + return_field, config_options: $CONFIG_OPTIONS - }); + }); assert_eq!(result.is_ok(), true, "function returned an error: {}", result.unwrap_err()); let result = result.unwrap().to_array(cardinality).expect("Failed to convert to array"); let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().expect("Failed to convert to type"); - assert_eq!(result.data_type(), &$EXPECTED_DATA_TYPE); + assert_eq!(result.data_type(), &$EXPECTED_DATA_TYPE); - // value is correct - match expected { - Some(v) => assert_eq!(result.value(0), v), - None => assert!(result.is_null(0)), - }; - } - Err(expected_error) => { - if return_field.is_err() { - match return_field { - Ok(_) => assert!(false, "expected error"), - Err(error) => { datafusion_common::assert_contains!(expected_error.strip_backtrace(), error.strip_backtrace()); } - } - } - else { - let return_field = return_field.unwrap(); - - // invoke is expected error - cannot use .expect_err() due to Debug not being implemented - match func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{ - args: $ARGS, - arg_fields, - number_rows: cardinality, - return_field, - config_options: $CONFIG_OPTIONS}) - { - Ok(_) => assert!(false, "expected error"), - Err(error) => { - assert!(expected_error.strip_backtrace().starts_with(&error.strip_backtrace())); - } + // value is correct + match expected { + Some(v) => assert_eq!(result.value(0), v), + None => assert!(result.is_null(0)), + }; + } + Err(expected_error) => { + if let Ok(return_field) = return_field { + // invoke is expected error - cannot use .expect_err() due to Debug not being implemented + match func.invoke_with_args(datafusion_expr::ScalarFunctionArgs { + args: $ARGS, + arg_fields, + number_rows: cardinality, + return_field, + config_options: $CONFIG_OPTIONS, + }) { + Ok(_) => assert!(false, "expected error"), + Err(error) => { + assert!(expected_error + .strip_backtrace() + .starts_with(&error.strip_backtrace())); } } + } else if let Err(error) = return_field { + datafusion_common::assert_contains!( + expected_error.strip_backtrace(), + error.strip_backtrace() + ); } - }; + } }; + }; ($FUNC:expr, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $EXPECTED_DATA_TYPE:expr, $ARRAY_TYPE:ident) => { test_function!( diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index e6fc006cb2..3ab8b9d26a 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -744,7 +744,7 @@ fn extract_window_frame_target_type(col_type: &DataType) -> Result<DataType> { } else if let DataType::Dictionary(_, value_type) = col_type { extract_window_frame_target_type(value_type) } else { - return internal_err!("Cannot run range queries on datatype: {col_type:?}"); + internal_err!("Cannot run range queries on datatype: {col_type:?}") } } diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 53d18e7edf..b83fd2d3d7 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -1888,7 +1888,7 @@ fn are_inlist_and_eq(left: &Expr, right: &Expr) -> bool { } /// Try to convert an expression to an in-list expression -fn as_inlist(expr: &Expr) -> Option<Cow<InList>> { +fn as_inlist(expr: &'_ Expr) -> Option<Cow<'_, InList>> { match expr { Expr::InList(inlist) => Some(Cow::Borrowed(inlist)), Expr::BinaryExpr(BinaryExpr { left, op, right }) if *op == Operator::Eq => { diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index ff39a851e2..e98fd14cfb 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -110,15 +110,13 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { // When the scalar is true or false, skip the scatter process if let Some(v) = value { if *v { - return Ok(ColumnarValue::from( - Arc::new(selection.clone()) as ArrayRef - )); + Ok(ColumnarValue::from(Arc::new(selection.clone()) as ArrayRef)) } else { - return Ok(tmp_result); + Ok(tmp_result) } } else { let array = BooleanArray::from(vec![None; batch.num_rows()]); - return scatter(selection, &array).map(ColumnarValue::Array); + scatter(selection, &array).map(ColumnarValue::Array) } } else { Ok(tmp_result) diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index 910631ef4a..22752a00e9 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -45,13 +45,13 @@ pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool { if let Ok(field) = schema.field_with_name(column.name()) { is_datatype_supported(field.data_type()) } else { - return false; + false } } else if let Some(literal) = expr_any.downcast_ref::<Literal>() { if let Ok(dt) = literal.data_type(schema) { is_datatype_supported(&dt) } else { - return false; + false } } else if let Some(cast) = expr_any.downcast_ref::<CastExpr>() { check_support(cast.expr(), schema) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 722bc6049c..5d96ac6dcc 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -1741,16 +1741,19 @@ mod tests { } fn check_result(actual_batch: &RecordBatch, expected_batch: &RecordBatch) { - let formatted_actual_batch = pretty_format_batches(&[actual_batch.clone()]) - .unwrap() - .to_string(); + let formatted_actual_batch = + pretty_format_batches(std::slice::from_ref(actual_batch)) + .unwrap() + .to_string(); let mut formatted_actual_batch_sorted: Vec<&str> = formatted_actual_batch.trim().lines().collect(); formatted_actual_batch_sorted.sort_unstable(); - let formatted_expected_batch = pretty_format_batches(&[expected_batch.clone()]) - .unwrap() - .to_string(); + let formatted_expected_batch = + pretty_format_batches(std::slice::from_ref(expected_batch)) + .unwrap() + .to_string(); + let mut formatted_expected_batch_sorted: Vec<&str> = formatted_expected_batch.trim().lines().collect(); formatted_expected_batch_sorted.sort_unstable(); diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index 8e0ba072b7..5962362d76 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -603,7 +603,7 @@ mod tests { } } fn batch_to_pretty_strings(batch: &RecordBatch) -> String { - arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + arrow::util::pretty::pretty_format_batches(std::slice::from_ref(batch)) .unwrap() .to_string() } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8157e1b721..066b7a95e9 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -717,8 +717,8 @@ impl RecordBatchStream for FilterExecStream { /// Return the equals Column-Pairs and Non-equals Column-Pairs pub fn collect_columns_from_predicate( - predicate: &Arc<dyn PhysicalExpr>, -) -> EqualAndNonEqual { + predicate: &'_ Arc<dyn PhysicalExpr>, +) -> EqualAndNonEqual<'_> { let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new(); let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new(); diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 5bd6448ed7..c76036a434 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -1078,6 +1078,7 @@ pub mod proto { pub expr: Option<datafusion_proto::protobuf::LogicalExprNode>, } + #[allow(dead_code)] #[derive(Clone, PartialEq, Eq, ::prost::Message)] pub struct TopKExecProto { #[prost(uint64, tag = "1")] diff --git a/datafusion/spark/src/function/utils.rs b/datafusion/spark/src/function/utils.rs index 0db11e6f1b..e272d91d8a 100644 --- a/datafusion/spark/src/function/utils.rs +++ b/datafusion/spark/src/function/utils.rs @@ -34,13 +34,13 @@ pub mod test { .enumerate() .map(|(idx, arg)| { - let nullable = match arg { - datafusion_expr::ColumnarValue::Scalar(scalar) => scalar.is_null(), - datafusion_expr::ColumnarValue::Array(a) => a.null_count() > 0, - }; + let nullable = match arg { + datafusion_expr::ColumnarValue::Scalar(scalar) => scalar.is_null(), + datafusion_expr::ColumnarValue::Array(a) => a.null_count() > 0, + }; std::sync::Arc::new(arrow::datatypes::Field::new(format!("arg_{idx}"), arg.data_type(), nullable)) - }) + }) .collect::<Vec<_>>(); let cardinality = $ARGS @@ -52,8 +52,8 @@ pub mod test { .unwrap_or(1); let scalar_arguments = $ARGS.iter().map(|arg| match arg { - datafusion_expr::ColumnarValue::Scalar(scalar) => Some(scalar.clone()), - datafusion_expr::ColumnarValue::Array(_) => None, + datafusion_expr::ColumnarValue::Scalar(scalar) => Some(scalar.clone()), + datafusion_expr::ColumnarValue::Array(_) => None, }).collect::<Vec<_>>(); let scalar_arguments_refs = scalar_arguments.iter().map(|arg| arg.as_ref()).collect::<Vec<_>>(); @@ -65,43 +65,56 @@ pub mod test { match expected { Ok(expected) => { - let return_field = return_field.unwrap(); - assert_eq!(return_field.data_type(), &$EXPECTED_DATA_TYPE); - - let result = func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{ - args: $ARGS, - number_rows: cardinality, - return_field, - arg_fields: arg_fields.clone(), - config_options: $CONFIG_OPTIONS, - }); - assert_eq!(result.is_ok(), true, "function returned an error: {}", result.unwrap_err()); + if let Ok(return_field) = return_field { + assert_eq!(return_field.data_type(), &$EXPECTED_DATA_TYPE); - let result = result.unwrap().to_array(cardinality).expect("Failed to convert to array"); - let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().expect("Failed to convert to type"); - assert_eq!(result.data_type(), &$EXPECTED_DATA_TYPE); + match func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{ + args: $ARGS, + number_rows: cardinality, + return_field, + arg_fields: arg_fields.clone(), + config_options: $CONFIG_OPTIONS, + }) { + Ok(col_value) => { + match col_value.to_array(cardinality) { + Ok(array) => { + let result = array + .as_any() + .downcast_ref::<$ARRAY_TYPE>() + .expect("Failed to convert to type"); + assert_eq!(result.data_type(), &$EXPECTED_DATA_TYPE); - // value is correct - match expected { - Some(v) => assert_eq!(result.value(0), v), - None => assert!(result.is_null(0)), - }; - } - Err(expected_error) => { - if return_field.is_err() { - match return_field { - Ok(_) => assert!(false, "expected error"), - Err(error) => { datafusion_common::assert_contains!(expected_error.strip_backtrace(), error.strip_backtrace()); } + // value is correct + match expected { + Some(v) => assert_eq!(result.value(0), v), + None => assert!(result.is_null(0)), + }; + } + Err(err) => { + panic!("Failed to convert to array: {err}"); + } + } + } + Err(err) => { + panic!("function returned an error: {err}"); + } } + } else { + panic!("Expected return_field to be Ok but got Err"); } - else { - let return_field = return_field.unwrap(); - + } + Err(expected_error) => { + if let Err(error) = &return_field { + datafusion_common::assert_contains!( + expected_error.strip_backtrace(), + error.strip_backtrace() + ); + } else if let Ok(value) = return_field { // invoke is expected error - cannot use .expect_err() due to Debug not being implemented - match func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{ + match func.invoke_with_args(datafusion_expr::ScalarFunctionArgs { args: $ARGS, number_rows: cardinality, - return_field, + return_field: value, arg_fields, config_options: $CONFIG_OPTIONS, }) { diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index 89fa392c18..8b3791017a 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -203,7 +203,7 @@ pub(crate) fn unproject_agg_exprs( windows.and_then(|w| find_window_expr(w, &c.name).cloned()) { // Window function can contain an aggregation columns, e.g., 'avg(sum(ss_sales_price)) over ...' that needs to be unprojected - return Ok(Transformed::yes(unproject_agg_exprs(unprojected_expr, agg, None)?)); + Ok(Transformed::yes(unproject_agg_exprs(unprojected_expr, agg, None)?)) } else { internal_err!( "Tried to unproject agg expr for column '{}' that was not found in the provided Aggregate!", &c.name diff --git a/rust-toolchain.toml b/rust-toolchain.toml index f772c0987b..55d572362d 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -19,5 +19,5 @@ # to compile this workspace and run CI jobs. [toolchain] -channel = "1.88.0" +channel = "1.89.0" components = ["rustfmt", "clippy"] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org