This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new b064930e5 chore: Fix merge conflicts from merging comet-parquet-exec into main (#1323) b064930e5 is described below commit b064930e5ee5124d1c35adbf9338c37e2933f83d Author: Matt Butrovich <mbutrov...@users.noreply.github.com> AuthorDate: Thu Jan 23 18:23:06 2025 -0500 chore: Fix merge conflicts from merging comet-parquet-exec into main (#1323) * Remove extra files after merging main into comet-parquet-exec. * Clean up imports in CometTestBase. --- native/spark-expr/src/lib.rs | 2 - native/spark-expr/src/schema_adapter.rs | 376 --------------------- native/spark-expr/src/variance.rs | 247 -------------- .../scala/org/apache/spark/sql/CometTestBase.scala | 7 +- 4 files changed, 3 insertions(+), 629 deletions(-) diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index c9cfab27d..9bf6bb24f 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -22,9 +22,7 @@ mod error; mod kernels; -mod schema_adapter; mod static_invoke; -pub use schema_adapter::SparkSchemaAdapterFactory; pub use static_invoke::*; mod struct_funcs; diff --git a/native/spark-expr/src/schema_adapter.rs b/native/spark-expr/src/schema_adapter.rs deleted file mode 100644 index 161ad6f16..000000000 --- a/native/spark-expr/src/schema_adapter.rs +++ /dev/null @@ -1,376 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Custom schema adapter that uses Spark-compatible casts - -use crate::cast::cast_supported; -use crate::{spark_cast, SparkCastOptions}; -use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions}; -use arrow_schema::{Schema, SchemaRef}; -use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; -use datafusion_common::plan_err; -use datafusion_expr::ColumnarValue; -use std::sync::Arc; - -/// An implementation of DataFusion's `SchemaAdapterFactory` that uses a Spark-compatible -/// `cast` implementation. -#[derive(Clone, Debug)] -pub struct SparkSchemaAdapterFactory { - /// Spark cast options - cast_options: SparkCastOptions, -} - -impl SparkSchemaAdapterFactory { - pub fn new(options: SparkCastOptions) -> Self { - Self { - cast_options: options, - } - } -} - -impl SchemaAdapterFactory for SparkSchemaAdapterFactory { - /// Create a new factory for mapping batches from a file schema to a table - /// schema. - /// - /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with - /// the same schema for both the projected table schema and the table - /// schema. - fn create( - &self, - required_schema: SchemaRef, - table_schema: SchemaRef, - ) -> Box<dyn SchemaAdapter> { - Box::new(SparkSchemaAdapter { - required_schema, - table_schema, - cast_options: self.cast_options.clone(), - }) - } -} - -/// This SchemaAdapter requires both the table schema and the projected table -/// schema. See [`SchemaMapping`] for more details -#[derive(Clone, Debug)] -pub struct SparkSchemaAdapter { - /// The schema for the table, projected to include only the fields being output (projected) by the - /// associated ParquetExec - required_schema: SchemaRef, - /// The entire table schema for the table we're using this to adapt. - /// - /// This is used to evaluate any filters pushed down into the scan - /// which may refer to columns that are not referred to anywhere - /// else in the plan. - table_schema: SchemaRef, - /// Spark cast options - cast_options: SparkCastOptions, -} - -impl SchemaAdapter for SparkSchemaAdapter { - /// Map a column index in the table schema to a column index in a particular - /// file schema - /// - /// Panics if index is not in range for the table schema - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> { - let field = self.required_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) - } - - /// Creates a `SchemaMapping` for casting or mapping the columns from the - /// file schema to the table schema. - /// - /// If the provided `file_schema` contains columns of a different type to - /// the expected `table_schema`, the method will attempt to cast the array - /// data from the file schema to the table schema where possible. - /// - /// Returns a [`SchemaMapping`] that can be applied to the output batch - /// along with an ordered list of columns to project from the file - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.required_schema.fields().len()]; - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, table_field)) = - self.required_schema.fields().find(file_field.name()) - { - if cast_supported( - file_field.data_type(), - table_field.data_type(), - &self.cast_options, - ) { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } else { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to required schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ); - } - } - } - - Ok(( - Arc::new(SchemaMapping { - required_schema: Arc::<Schema>::clone(&self.required_schema), - field_mappings, - table_schema: Arc::<Schema>::clone(&self.table_schema), - cast_options: self.cast_options.clone(), - }), - projection, - )) - } -} - -// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast -// instead of arrow cast - can we reduce the amount of code copied here and make -// the DataFusion version more extensible? - -/// The SchemaMapping struct holds a mapping from the file schema to the table -/// schema and any necessary type conversions. -/// -/// Note, because `map_batch` and `map_partial_batch` functions have different -/// needs, this struct holds two schemas: -/// -/// 1. The projected **table** schema -/// 2. The full table schema -/// -/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which -/// has the projected schema, since that's the schema which is supposed to come -/// out of the execution of this query. Thus `map_batch` uses -/// `projected_table_schema` as it can only operate on the projected fields. -/// -/// [`map_partial_batch`] is used to create a RecordBatch with a schema that -/// can be used for Parquet predicate pushdown, meaning that it may contain -/// fields which are not in the projected schema (as the fields that parquet -/// pushdown filters operate can be completely distinct from the fields that are -/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses -/// `table_schema` to create the resulting RecordBatch (as it could be operating -/// on any fields in the schema). -/// -/// [`map_batch`]: Self::map_batch -/// [`map_partial_batch`]: Self::map_partial_batch -#[derive(Debug)] -pub struct SchemaMapping { - /// The schema of the table. This is the expected schema after conversion - /// and it should match the schema of the query result. - required_schema: SchemaRef, - /// Mapping from field index in `projected_table_schema` to index in - /// projected file_schema. - /// - /// They are Options instead of just plain `usize`s because the table could - /// have fields that don't exist in the file. - field_mappings: Vec<Option<usize>>, - /// The entire table schema, as opposed to the projected_table_schema (which - /// only contains the columns that we are projecting out of this query). - /// This contains all fields in the table, regardless of if they will be - /// projected out or not. - table_schema: SchemaRef, - /// Spark cast options - cast_options: SparkCastOptions, -} - -impl SchemaMapper for SchemaMapping { - /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and - /// conversions. The produced RecordBatch has a schema that contains only the projected - /// columns, so if one needs a RecordBatch with a schema that references columns which are not - /// in the projected, it would be better to use `map_partial_batch` - fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> { - let batch_rows = batch.num_rows(); - let batch_cols = batch.columns().to_vec(); - - let cols = self - .required_schema - // go through each field in the projected schema - .fields() - .iter() - // and zip it with the index that maps fields from the projected table schema to the - // projected file schema in `batch` - .zip(&self.field_mappings) - // and for each one... - .map(|(field, file_idx)| { - file_idx.map_or_else( - // If this field only exists in the table, and not in the file, then we know - // that it's null, so just return that. - || Ok(new_null_array(field.data_type(), batch_rows)), - // However, if it does exist in both, then try to cast it to the correct output - // type - |batch_idx| { - spark_cast( - ColumnarValue::Array(Arc::clone(&batch_cols[batch_idx])), - field.data_type(), - &self.cast_options, - )? - .into_array(batch_rows) - }, - ) - }) - .collect::<datafusion_common::Result<Vec<_>, _>>()?; - - // Necessary to handle empty batches - let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - - let schema = Arc::<Schema>::clone(&self.required_schema); - let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; - Ok(record_batch) - } - - /// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only - /// contains the fields that exist in both the file schema and table schema. - /// - /// Unlike `map_batch` this method also preserves the columns that - /// may not appear in the final output (`projected_table_schema`) but may - /// appear in push down predicates - fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> { - let batch_cols = batch.columns().to_vec(); - let schema = batch.schema(); - - // for each field in the batch's schema (which is based on a file, not a table)... - let (cols, fields) = schema - .fields() - .iter() - .zip(batch_cols.iter()) - .flat_map(|(field, batch_col)| { - self.table_schema - // try to get the same field from the table schema that we have stored in self - .field_with_name(field.name()) - // and if we don't have it, that's fine, ignore it. This may occur when we've - // created an external table whose fields are a subset of the fields in this - // file, then tried to read data from the file into this table. If that is the - // case here, it's fine to ignore because we don't care about this field - // anyways - .ok() - // but if we do have it, - .map(|table_field| { - // try to cast it into the correct output type. we don't want to ignore this - // error, though, so it's propagated. - spark_cast( - ColumnarValue::Array(Arc::clone(batch_col)), - table_field.data_type(), - &self.cast_options, - )? - .into_array(batch_col.len()) - // and if that works, return the field and column. - .map(|new_col| (new_col, table_field.clone())) - }) - }) - .collect::<Result<Vec<_>, _>>()? - .into_iter() - .unzip::<_, _, Vec<_>, Vec<_>>(); - - // Necessary to handle empty batches - let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - - let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); - let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; - Ok(record_batch) - } -} - -#[cfg(test)] -mod test { - use crate::test_common::file_util::get_temp_filename; - use crate::{EvalMode, SparkCastOptions, SparkSchemaAdapterFactory}; - use arrow::array::{Int32Array, StringArray}; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use arrow_array::UInt32Array; - use arrow_schema::SchemaRef; - use datafusion::datasource::listing::PartitionedFile; - use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; - use datafusion::execution::object_store::ObjectStoreUrl; - use datafusion::execution::TaskContext; - use datafusion::physical_plan::ExecutionPlan; - use datafusion_common::DataFusionError; - use futures::StreamExt; - use parquet::arrow::ArrowWriter; - use std::fs::File; - use std::sync::Arc; - - #[tokio::test] - async fn parquet_roundtrip_int_as_string() -> Result<(), DataFusionError> { - let file_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - ])); - - let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn arrow::array::Array>; - let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) - as Arc<dyn arrow::array::Array>; - let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names])?; - - let required_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Utf8, false), - Field::new("name", DataType::Utf8, false), - ])); - - let _ = roundtrip(&batch, required_schema).await?; - - Ok(()) - } - - #[tokio::test] - async fn parquet_roundtrip_unsigned_int() -> Result<(), DataFusionError> { - let file_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::UInt32, false)])); - - let ids = Arc::new(UInt32Array::from(vec![1, 2, 3])) as Arc<dyn arrow::array::Array>; - let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids])?; - - let required_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - - let _ = roundtrip(&batch, required_schema).await?; - - Ok(()) - } - - /// Create a Parquet file containing a single batch and then read the batch back using - /// the specified required_schema. This will cause the SchemaAdapter code to be used. - async fn roundtrip( - batch: &RecordBatch, - required_schema: SchemaRef, - ) -> Result<RecordBatch, DataFusionError> { - let filename = get_temp_filename(); - let filename = filename.as_path().as_os_str().to_str().unwrap().to_string(); - let file = File::create(&filename)?; - let mut writer = ArrowWriter::try_new(file, Arc::clone(&batch.schema()), None)?; - writer.write(batch)?; - writer.close()?; - - let object_store_url = ObjectStoreUrl::local_filesystem(); - let file_scan_config = FileScanConfig::new(object_store_url, required_schema) - .with_file_groups(vec![vec![PartitionedFile::from_path( - filename.to_string(), - )?]]); - - let mut spark_cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC", false); - spark_cast_options.allow_cast_unsigned_ints = true; - - let parquet_exec = ParquetExec::builder(file_scan_config) - .with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new( - spark_cast_options, - ))) - .build(); - - let mut stream = parquet_exec - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); - stream.next().await.unwrap() - } -} diff --git a/native/spark-expr/src/variance.rs b/native/spark-expr/src/variance.rs deleted file mode 100644 index e71d713f5..000000000 --- a/native/spark-expr/src/variance.rs +++ /dev/null @@ -1,247 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::any::Any; - -use arrow::{ - array::{ArrayRef, Float64Array}, - datatypes::{DataType, Field}, -}; -use datafusion::logical_expr::Accumulator; -use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; -use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; -use datafusion_expr::Volatility::Immutable; -use datafusion_expr::{AggregateUDFImpl, Signature}; -use datafusion_physical_expr::expressions::format_state_name; -use datafusion_physical_expr::expressions::StatsType; - -/// VAR_SAMP and VAR_POP aggregate expression -/// The implementation mostly is the same as the DataFusion's implementation. The reason -/// we have our own implementation is that DataFusion has UInt64 for state_field `count`, -/// while Spark has Double for count. Also we have added `null_on_divide_by_zero` -/// to be consistent with Spark's implementation. -#[derive(Debug)] -pub struct Variance { - name: String, - signature: Signature, - stats_type: StatsType, - null_on_divide_by_zero: bool, -} - -impl Variance { - /// Create a new VARIANCE aggregate function - pub fn new( - name: impl Into<String>, - data_type: DataType, - stats_type: StatsType, - null_on_divide_by_zero: bool, - ) -> Self { - // the result of variance just support FLOAT64 data type. - assert!(matches!(data_type, DataType::Float64)); - Self { - name: name.into(), - signature: Signature::numeric(1, Immutable), - stats_type, - null_on_divide_by_zero, - } - } -} - -impl AggregateUDFImpl for Variance { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - &self.name - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { - Ok(DataType::Float64) - } - - fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { - Ok(Box::new(VarianceAccumulator::try_new( - self.stats_type, - self.null_on_divide_by_zero, - )?)) - } - - fn create_sliding_accumulator(&self, _args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { - Ok(Box::new(VarianceAccumulator::try_new( - self.stats_type, - self.null_on_divide_by_zero, - )?)) - } - - fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> { - Ok(vec![ - Field::new( - format_state_name(&self.name, "count"), - DataType::Float64, - true, - ), - Field::new( - format_state_name(&self.name, "mean"), - DataType::Float64, - true, - ), - Field::new(format_state_name(&self.name, "m2"), DataType::Float64, true), - ]) - } - - fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> { - Ok(ScalarValue::Float64(None)) - } -} - -/// An accumulator to compute variance -#[derive(Debug)] -pub struct VarianceAccumulator { - m2: f64, - mean: f64, - count: f64, - stats_type: StatsType, - null_on_divide_by_zero: bool, -} - -impl VarianceAccumulator { - /// Creates a new `VarianceAccumulator` - pub fn try_new(s_type: StatsType, null_on_divide_by_zero: bool) -> Result<Self> { - Ok(Self { - m2: 0_f64, - mean: 0_f64, - count: 0_f64, - stats_type: s_type, - null_on_divide_by_zero, - }) - } - - pub fn get_count(&self) -> f64 { - self.count - } - - pub fn get_mean(&self) -> f64 { - self.mean - } - - pub fn get_m2(&self) -> f64 { - self.m2 - } -} - -impl Accumulator for VarianceAccumulator { - fn state(&mut self) -> Result<Vec<ScalarValue>> { - Ok(vec![ - ScalarValue::from(self.count), - ScalarValue::from(self.mean), - ScalarValue::from(self.m2), - ]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let arr = downcast_value!(&values[0], Float64Array).iter().flatten(); - - for value in arr { - let new_count = self.count + 1.0; - let delta1 = value - self.mean; - let new_mean = delta1 / new_count + self.mean; - let delta2 = value - new_mean; - let new_m2 = self.m2 + delta1 * delta2; - - self.count += 1.0; - self.mean = new_mean; - self.m2 = new_m2; - } - - Ok(()) - } - - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let arr = downcast_value!(&values[0], Float64Array).iter().flatten(); - - for value in arr { - let new_count = self.count - 1.0; - let delta1 = self.mean - value; - let new_mean = delta1 / new_count + self.mean; - let delta2 = new_mean - value; - let new_m2 = self.m2 - delta1 * delta2; - - self.count -= 1.0; - self.mean = new_mean; - self.m2 = new_m2; - } - - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let counts = downcast_value!(states[0], Float64Array); - let means = downcast_value!(states[1], Float64Array); - let m2s = downcast_value!(states[2], Float64Array); - - for i in 0..counts.len() { - let c = counts.value(i); - if c == 0_f64 { - continue; - } - let new_count = self.count + c; - let new_mean = self.mean * self.count / new_count + means.value(i) * c / new_count; - let delta = self.mean - means.value(i); - let new_m2 = self.m2 + m2s.value(i) + delta * delta * self.count * c / new_count; - - self.count = new_count; - self.mean = new_mean; - self.m2 = new_m2; - } - Ok(()) - } - - fn evaluate(&mut self) -> Result<ScalarValue> { - let count = match self.stats_type { - StatsType::Population => self.count, - StatsType::Sample => { - if self.count > 0.0 { - self.count - 1.0 - } else { - self.count - } - } - }; - - Ok(ScalarValue::Float64(match self.count { - count if count == 0.0 => None, - count if count == 1.0 && StatsType::Sample == self.stats_type => { - if self.null_on_divide_by_zero { - None - } else { - Some(f64::NAN) - } - } - _ => Some(self.m2 / count), - })) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 96ee3846b..215f0c876 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -36,14 +36,13 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark._ import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} -import org.apache.spark.sql.comet.{CometBatchScanExec, CometBroadcastExchangeExec, CometColumnarToRowExec, CometExec, CometNativeScanExec, CometScanExec, CometScanWrapper, CometSinkPlaceHolder, CometSparkToColumnarExec} +import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec} -import org.apache.spark.sql.execution.{ColumnarToRowExec, ExtendedMode, InputAdapter, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal._ import org.apache.spark.sql.test._ -import org.apache.spark.sql.types.DecimalType -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.comet._ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org