This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 21248fbbb4 Change default SQL mapping for `VARCAHR` from `Utf8` to  
`Utf8View` (#16142)
21248fbbb4 is described below

commit 21248fbbb4db9f30a3bbc92561b29a71ab0992bd
Author: Qi Zhu <821684...@qq.com>
AuthorDate: Fri May 30 13:44:16 2025 +0800

    Change default SQL mapping for `VARCAHR` from `Utf8` to  `Utf8View` (#16142)
    
    * Change default mapping of SQL VARCHAR from Utf8 to Utf8View
    
    * Fix test
    
    * Fix doc
    
    * Add utf8view support for user defined
    
    * update testing data
    
    * fix
    
    * clippy
    
    * fix
    
    * Fix
    
    * Fix fmt
    
    * Fix
    
    * Fix slt testing
    
    * add doc for avro utf8view
    
    * Support utf8view for STRING_AGG
    
    * Address comments
    
    * fmt
---
 datafusion-examples/examples/dataframe.rs          | 17 +++++---
 datafusion/common/src/config.rs                    |  2 +-
 datafusion/core/tests/dataframe/mod.rs             |  5 +++
 datafusion/core/tests/sql/explain_analyze.rs       | 48 +++++++++++-----------
 .../core/tests/user_defined/user_defined_plan.rs   | 40 +++++++++++-------
 datafusion/functions-aggregate/src/string_agg.rs   | 12 +++++-
 datafusion/sql/src/planner.rs                      |  2 +-
 datafusion/sql/tests/sql_integration.rs            |  6 +--
 .../src/engines/datafusion_engine/normalize.rs     |  4 +-
 datafusion/sqllogictest/test_files/aggregate.slt   | 21 ++++++++--
 datafusion/sqllogictest/test_files/avro.slt        |  4 ++
 datafusion/sqllogictest/test_files/coalesce.slt    |  4 +-
 datafusion/sqllogictest/test_files/ddl.slt         |  6 +--
 datafusion/sqllogictest/test_files/delete.slt      |  6 +--
 .../test_files/filter_without_sort_exec.slt        | 14 +++----
 datafusion/sqllogictest/test_files/imdb.slt        |  2 +-
 .../sqllogictest/test_files/information_schema.slt |  4 +-
 datafusion/sqllogictest/test_files/join.slt.part   |  8 ++--
 datafusion/sqllogictest/test_files/joins.slt       | 14 +++----
 .../test_files/min_max/fixed_size_list.slt         |  1 -
 .../sqllogictest/test_files/min_max/large_list.slt |  5 ---
 .../sqllogictest/test_files/min_max/list.slt       |  1 -
 datafusion/sqllogictest/test_files/order.slt       |  4 +-
 .../test_files/parquet_filter_pushdown.slt         |  4 +-
 datafusion/sqllogictest/test_files/predicates.slt  | 10 ++---
 datafusion/sqllogictest/test_files/struct.slt      | 30 +++++++-------
 .../sqllogictest/test_files/subquery_sort.slt      |  7 ++--
 .../test_files/tpch/plans/q10.slt.part             |  4 +-
 .../test_files/tpch/plans/q11.slt.part             |  8 ++--
 .../test_files/tpch/plans/q12.slt.part             |  6 +--
 .../test_files/tpch/plans/q13.slt.part             |  4 +-
 .../test_files/tpch/plans/q14.slt.part             |  2 +-
 .../test_files/tpch/plans/q16.slt.part             |  8 ++--
 .../test_files/tpch/plans/q17.slt.part             |  4 +-
 .../test_files/tpch/plans/q19.slt.part             | 14 +++----
 .../sqllogictest/test_files/tpch/plans/q2.slt.part | 12 +++---
 .../test_files/tpch/plans/q20.slt.part             |  8 ++--
 .../test_files/tpch/plans/q21.slt.part             |  8 ++--
 .../sqllogictest/test_files/tpch/plans/q3.slt.part |  4 +-
 .../sqllogictest/test_files/tpch/plans/q5.slt.part |  4 +-
 .../sqllogictest/test_files/tpch/plans/q7.slt.part | 10 ++---
 .../sqllogictest/test_files/tpch/plans/q8.slt.part | 10 ++---
 .../sqllogictest/test_files/tpch/plans/q9.slt.part |  4 +-
 datafusion/sqllogictest/test_files/union.slt       | 12 +++---
 .../sqllogictest/test_files/union_by_name.slt      | 12 +++---
 datafusion/sqllogictest/test_files/update.slt      |  8 ++--
 datafusion/sqllogictest/test_files/window.slt      |  4 +-
 docs/source/user-guide/configs.md                  |  2 +-
 parquet-testing                                    |  2 +-
 49 files changed, 238 insertions(+), 193 deletions(-)

diff --git a/datafusion-examples/examples/dataframe.rs 
b/datafusion-examples/examples/dataframe.rs
index 0ddf6aa2d0..57a28aeca0 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
+use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, 
StringViewArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::catalog::MemTable;
 use datafusion::common::config::CsvOptions;
 use datafusion::common::parsers::CompressionTypeVariant;
 use datafusion::common::DataFusionError;
@@ -198,10 +199,16 @@ async fn read_memory_macro() -> Result<()> {
 /// 3. Write out a DataFrame to a csv file
 /// 4. Write out a DataFrame to a json file
 async fn write_out(ctx: &SessionContext) -> std::result::Result<(), 
DataFusionError> {
-    let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap();
-
-    // Ensure the column names and types match the target table
-    df = df.with_column_renamed("column1", "tablecol1").unwrap();
+    let array = StringViewArray::from(vec!["a", "b", "c"]);
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "tablecol1",
+        DataType::Utf8View,
+        false,
+    )]));
+    let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)])?;
+    let mem_table = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
+    ctx.register_table("initial_data", Arc::new(mem_table))?;
+    let df = ctx.table("initial_data").await?;
 
     ctx.sql(
         "create external table
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 59283114e3..726015d171 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -262,7 +262,7 @@ config_namespace! {
        /// If true, `VARCHAR` is mapped to `Utf8View` during SQL planning.
        /// If false, `VARCHAR` is mapped to `Utf8`  during SQL planning.
        /// Default is false.
-        pub map_varchar_to_utf8view: bool, default = false
+        pub map_varchar_to_utf8view: bool, default = true
 
         /// When set to true, the source locations relative to the original SQL
         /// query (i.e. 
[`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html))
 will be collected
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index dfd11fcb09..089ff88081 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -2505,6 +2505,11 @@ async fn write_table_with_order() -> Result<()> {
     write_df = write_df
         .with_column_renamed("column1", "tablecol1")
         .unwrap();
+
+    // Ensure the column type matches the target table
+    write_df =
+        write_df.with_column("tablecol1", cast(col("tablecol1"), 
DataType::Utf8View))?;
+
     let sql_str =
         "create external table data(tablecol1 varchar) stored as parquet 
location '"
             .to_owned()
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 87e354959f..70e94227cf 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -176,9 +176,9 @@ async fn csv_explain_plans() {
     // Verify schema
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: aggregate_test_100.c1 [c1:Utf8]",
-        "    Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int8, 
c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, 
c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
-        "      TableScan: aggregate_test_100 [c1:Utf8, c2:Int8, c3:Int16, 
c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, 
c11:Float32, c12:Float64, c13:Utf8]",
+        "  Projection: aggregate_test_100.c1 [c1:Utf8View]",
+        "    Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8View, c2:Int8, 
c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, 
c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
+        "      TableScan: aggregate_test_100 [c1:Utf8View, c2:Int8, c3:Int16, 
c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, 
c11:Float32, c12:Float64, c13:Utf8View]",
     ];
     let formatted = plan.display_indent_schema().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -222,11 +222,11 @@ async fn csv_explain_plans() {
         "  {",
         "    graph[label=\"Detailed LogicalPlan\"]",
         "    7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, 
plan:Utf8]\"]",
-        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8]\"]",
+        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8View]\"]",
         "    7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int64(10)\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, 
c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, 
c13:Utf8]\"]",
+        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int64(10)\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, 
c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, 
c13:Utf8View]\"]",
         "    8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: 
[c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, 
c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
+        "    10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: 
[c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, 
c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
         "    9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
         "  }",
         "}",
@@ -250,9 +250,9 @@ async fn csv_explain_plans() {
     // Verify schema
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: aggregate_test_100.c1 [c1:Utf8]",
-        "    Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8, c2:Int8]",
-        "      TableScan: aggregate_test_100 projection=[c1, c2], 
partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8, c2:Int8]",
+        "  Projection: aggregate_test_100.c1 [c1:Utf8View]",
+        "    Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8View, c2:Int8]",
+        "      TableScan: aggregate_test_100 projection=[c1, c2], 
partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8View, c2:Int8]",
     ];
     let formatted = plan.display_indent_schema().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -296,11 +296,11 @@ async fn csv_explain_plans() {
         "  {",
         "    graph[label=\"Detailed LogicalPlan\"]",
         "    7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, 
plan:Utf8]\"]",
-        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8]\"]",
+        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8View]\"]",
         "    7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int8(10)\\nSchema: [c1:Utf8, c2:Int8]\"]",
+        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int8(10)\\nSchema: [c1:Utf8View, c2:Int8]\"]",
         "    8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    10[shape=box label=\"TableScan: aggregate_test_100 
projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > 
Int8(10)]\\nSchema: [c1:Utf8, c2:Int8]\"]",
+        "    10[shape=box label=\"TableScan: aggregate_test_100 
projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > 
Int8(10)]\\nSchema: [c1:Utf8View, c2:Int8]\"]",
         "    9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
         "  }",
         "}",
@@ -398,9 +398,9 @@ async fn csv_explain_verbose_plans() {
     // Verify schema
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: aggregate_test_100.c1 [c1:Utf8]",
-        "    Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int8, 
c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, 
c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
-        "      TableScan: aggregate_test_100 [c1:Utf8, c2:Int8, c3:Int16, 
c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, 
c11:Float32, c12:Float64, c13:Utf8]",
+        "  Projection: aggregate_test_100.c1 [c1:Utf8View]",
+        "    Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8View, c2:Int8, 
c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, 
c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
+        "      TableScan: aggregate_test_100 [c1:Utf8View, c2:Int8, c3:Int16, 
c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, 
c11:Float32, c12:Float64, c13:Utf8View]",
     ];
     let formatted = 
dataframe.logical_plan().display_indent_schema().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -444,11 +444,11 @@ async fn csv_explain_verbose_plans() {
         "  {",
         "    graph[label=\"Detailed LogicalPlan\"]",
         "    7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, 
plan:Utf8]\"]",
-        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8]\"]",
+        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8View]\"]",
         "    7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int64(10)\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, 
c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, 
c13:Utf8]\"]",
+        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int64(10)\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, 
c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, 
c13:Utf8View]\"]",
         "    8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: 
[c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, 
c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
+        "    10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: 
[c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, 
c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
         "    9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
         "  }",
         "}",
@@ -472,9 +472,9 @@ async fn csv_explain_verbose_plans() {
     // Verify schema
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: aggregate_test_100.c1 [c1:Utf8]",
-        "    Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8, c2:Int8]",
-        "      TableScan: aggregate_test_100 projection=[c1, c2], 
partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8, c2:Int8]",
+        "  Projection: aggregate_test_100.c1 [c1:Utf8View]",
+        "    Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8View, c2:Int8]",
+        "      TableScan: aggregate_test_100 projection=[c1, c2], 
partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8View, c2:Int8]",
     ];
     let formatted = plan.display_indent_schema().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -518,11 +518,11 @@ async fn csv_explain_verbose_plans() {
         "  {",
         "    graph[label=\"Detailed LogicalPlan\"]",
         "    7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, 
plan:Utf8]\"]",
-        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8]\"]",
+        "    8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: 
[c1:Utf8View]\"]",
         "    7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int8(10)\\nSchema: [c1:Utf8, c2:Int8]\"]",
+        "    9[shape=box label=\"Filter: aggregate_test_100.c2 > 
Int8(10)\\nSchema: [c1:Utf8View, c2:Int8]\"]",
         "    8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
-        "    10[shape=box label=\"TableScan: aggregate_test_100 
projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > 
Int8(10)]\\nSchema: [c1:Utf8, c2:Int8]\"]",
+        "    10[shape=box label=\"TableScan: aggregate_test_100 
projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > 
Int8(10)]\\nSchema: [c1:Utf8View, c2:Int8]\"]",
         "    9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
         "  }",
         "}",
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs 
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index e46940e631..b68ef6aca0 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -63,15 +63,14 @@ use std::hash::Hash;
 use std::task::{Context, Poll};
 use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
 
+use arrow::array::{Array, ArrayRef, StringViewArray};
 use arrow::{
-    array::{Int64Array, StringArray},
-    datatypes::SchemaRef,
-    record_batch::RecordBatch,
+    array::Int64Array, datatypes::SchemaRef, record_batch::RecordBatch,
     util::pretty::pretty_format_batches,
 };
 use datafusion::execution::session_state::SessionStateBuilder;
 use datafusion::{
-    common::cast::{as_int64_array, as_string_array},
+    common::cast::as_int64_array,
     common::{arrow_datafusion_err, internal_err, DFSchemaRef},
     error::{DataFusionError, Result},
     execution::{
@@ -100,6 +99,7 @@ use datafusion_optimizer::AnalyzerRule;
 use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
 use async_trait::async_trait;
+use datafusion_common::cast::as_string_view_array;
 use futures::{Stream, StreamExt};
 
 /// Execute the specified sql and return the resulting record batches
@@ -796,22 +796,26 @@ fn accumulate_batch(
     k: &usize,
 ) -> BTreeMap<i64, String> {
     let num_rows = input_batch.num_rows();
+
     // Assuming the input columns are
-    // column[0]: customer_id / UTF8
+    // column[0]: customer_id UTF8View
     // column[1]: revenue: Int64
-    let customer_id =
-        as_string_array(input_batch.column(0)).expect("Column 0 is not 
customer_id");
 
+    let customer_id_column = input_batch.column(0);
     let revenue = as_int64_array(input_batch.column(1)).unwrap();
 
     for row in 0..num_rows {
-        add_row(
-            &mut top_values,
-            customer_id.value(row),
-            revenue.value(row),
-            k,
-        );
+        let customer_id = match customer_id_column.data_type() {
+            arrow::datatypes::DataType::Utf8View => {
+                let array = as_string_view_array(customer_id_column).unwrap();
+                array.value(row)
+            }
+            _ => panic!("Unsupported customer_id type"),
+        };
+
+        add_row(&mut top_values, customer_id, revenue.value(row), k);
     }
+
     top_values
 }
 
@@ -843,11 +847,19 @@ impl Stream for TopKReader {
                     self.state.iter().rev().unzip();
 
                 let customer: Vec<&str> = customer.iter().map(|&s| 
&**s).collect();
+
+                let customer_array: ArrayRef = match 
schema.field(0).data_type() {
+                    arrow::datatypes::DataType::Utf8View => {
+                        Arc::new(StringViewArray::from(customer))
+                    }
+                    other => panic!("Unsupported customer_id output type: 
{other:?}"),
+                };
+
                 Poll::Ready(Some(
                     RecordBatch::try_new(
                         schema,
                         vec![
-                            Arc::new(StringArray::from(customer)),
+                            Arc::new(customer_array),
                             Arc::new(Int64Array::from(revenue)),
                         ],
                     )
diff --git a/datafusion/functions-aggregate/src/string_agg.rs 
b/datafusion/functions-aggregate/src/string_agg.rs
index 3f7e503acf..4682e574bf 100644
--- a/datafusion/functions-aggregate/src/string_agg.rs
+++ b/datafusion/functions-aggregate/src/string_agg.rs
@@ -20,7 +20,7 @@
 use crate::array_agg::ArrayAgg;
 use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType, Field, FieldRef};
-use datafusion_common::cast::as_generic_string_array;
+use datafusion_common::cast::{as_generic_string_array, as_string_view_array};
 use datafusion_common::Result;
 use datafusion_common::{internal_err, not_impl_err, ScalarValue};
 use datafusion_expr::function::AccumulatorArgs;
@@ -95,9 +95,15 @@ impl StringAgg {
                     TypeSignature::Exact(vec![DataType::LargeUtf8, 
DataType::Utf8]),
                     TypeSignature::Exact(vec![DataType::LargeUtf8, 
DataType::LargeUtf8]),
                     TypeSignature::Exact(vec![DataType::LargeUtf8, 
DataType::Null]),
+                    TypeSignature::Exact(vec![DataType::LargeUtf8, 
DataType::Utf8View]),
                     TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]),
                     TypeSignature::Exact(vec![DataType::Utf8, 
DataType::LargeUtf8]),
                     TypeSignature::Exact(vec![DataType::Utf8, DataType::Null]),
+                    TypeSignature::Exact(vec![DataType::Utf8, 
DataType::Utf8View]),
+                    TypeSignature::Exact(vec![DataType::Utf8View, 
DataType::Utf8View]),
+                    TypeSignature::Exact(vec![DataType::Utf8View, 
DataType::LargeUtf8]),
+                    TypeSignature::Exact(vec![DataType::Utf8View, 
DataType::Null]),
+                    TypeSignature::Exact(vec![DataType::Utf8View, 
DataType::Utf8]),
                 ],
                 Volatility::Immutable,
             ),
@@ -211,6 +217,10 @@ impl Accumulator for StringAggAccumulator {
                 .iter()
                 .flatten()
                 .collect(),
+            DataType::Utf8View => as_string_view_array(list.values())?
+                .iter()
+                .flatten()
+                .collect(),
             _ => {
                 return internal_err!(
                     "Expected elements to of type Utf8 or LargeUtf8, but got 
{}",
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 73d136d7d1..5a1f3cdf69 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -72,7 +72,7 @@ impl ParserOptions {
             parse_float_as_decimal: false,
             enable_ident_normalization: true,
             support_varchar_with_length: true,
-            map_varchar_to_utf8view: false,
+            map_varchar_to_utf8view: true,
             enable_options_value_normalization: false,
             collect_spans: false,
         }
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index 365012b7f6..4be7953aef 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -3355,7 +3355,7 @@ fn parse_decimals_parser_options() -> ParserOptions {
         parse_float_as_decimal: true,
         enable_ident_normalization: false,
         support_varchar_with_length: false,
-        map_varchar_to_utf8view: false,
+        map_varchar_to_utf8view: true,
         enable_options_value_normalization: false,
         collect_spans: false,
     }
@@ -3366,7 +3366,7 @@ fn 
ident_normalization_parser_options_no_ident_normalization() -> ParserOptions
         parse_float_as_decimal: true,
         enable_ident_normalization: false,
         support_varchar_with_length: false,
-        map_varchar_to_utf8view: false,
+        map_varchar_to_utf8view: true,
         enable_options_value_normalization: false,
         collect_spans: false,
     }
@@ -3377,7 +3377,7 @@ fn 
ident_normalization_parser_options_ident_normalization() -> ParserOptions {
         parse_float_as_decimal: true,
         enable_ident_normalization: true,
         support_varchar_with_length: false,
-        map_varchar_to_utf8view: false,
+        map_varchar_to_utf8view: true,
         enable_options_value_normalization: false,
         collect_spans: false,
     }
diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs 
b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs
index b3396bbd06..0d832bb306 100644
--- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs
+++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs
@@ -292,7 +292,9 @@ pub fn convert_schema_to_types(columns: &Fields) -> 
Vec<DFColumnType> {
                 if key_type.is_integer() {
                     // mapping dictionary string types to Text
                     match value_type.as_ref() {
-                        DataType::Utf8 | DataType::LargeUtf8 => 
DFColumnType::Text,
+                        DataType::Utf8 | DataType::LargeUtf8 | 
DataType::Utf8View => {
+                            DFColumnType::Text
+                        }
                         _ => DFColumnType::Another,
                     }
                 } else {
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 002014827e..52b1e1c22f 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -132,21 +132,34 @@ statement error DataFusion error: Schema error: Schema 
contains duplicate unqual
 SELECT approx_distinct(c9) count_c9, approx_distinct(cast(c9 as varchar)) 
count_c9_str FROM aggregate_test_100
 
 # csv_query_approx_percentile_cont_with_weight
-statement error DataFusion error: Error during planning: Failed to coerce 
arguments to satisfy a call to 'approx_percentile_cont_with_weight' function: 
coercion from \[Utf8, Int8, Float64\] to the signature OneOf(.*) failed(.|\n)*
+statement error
 SELECT approx_percentile_cont_with_weight(c2, 0.95) WITHIN GROUP (ORDER BY c1) 
FROM aggregate_test_100
+----
+DataFusion error: Error during planning: Failed to coerce arguments to satisfy 
a call to 'approx_percentile_cont_with_weight' function: coercion from 
[Utf8View, Int8, Float64] to the signature OneOf([Exact([Int8, Int8, Float64]), 
Exact([Int16, Int16, Float64]), Exact([Int32, Int32, Float64]), Exact([Int64, 
Int64, Float64]), Exact([UInt8, UInt8, Float64]), Exact([UInt16, UInt16, 
Float64]), Exact([UInt32, UInt32, Float64]), Exact([UInt64, UInt64, Float64]), 
Exact([Float32, Float32, Float64 [...]
 
-statement error DataFusion error: Error during planning: Failed to coerce 
arguments to satisfy a call to 'approx_percentile_cont_with_weight' function: 
coercion from \[Int16, Utf8, Float64\] to the signature OneOf(.*) failed(.|\n)*
+
+statement error
 SELECT approx_percentile_cont_with_weight(c1, 0.95) WITHIN GROUP (ORDER BY c3) 
FROM aggregate_test_100
+----
+DataFusion error: Error during planning: Failed to coerce arguments to satisfy 
a call to 'approx_percentile_cont_with_weight' function: coercion from [Int16, 
Utf8View, Float64] to the signature OneOf([Exact([Int8, Int8, Float64]), 
Exact([Int16, Int16, Float64]), Exact([Int32, Int32, Float64]), Exact([Int64, 
Int64, Float64]), Exact([UInt8, UInt8, Float64]), Exact([UInt16, UInt16, 
Float64]), Exact([UInt32, UInt32, Float64]), Exact([UInt64, UInt64, Float64]), 
Exact([Float32, Float32, Float6 [...]
+
 
-statement error DataFusion error: Error during planning: Failed to coerce 
arguments to satisfy a call to 'approx_percentile_cont_with_weight' function: 
coercion from \[Int16, Int8, Utf8\] to the signature OneOf(.*) failed(.|\n)*
+statement error
 SELECT approx_percentile_cont_with_weight(c2, c1) WITHIN GROUP (ORDER BY c3) 
FROM aggregate_test_100
+----
+DataFusion error: Error during planning: Failed to coerce arguments to satisfy 
a call to 'approx_percentile_cont_with_weight' function: coercion from [Int16, 
Int8, Utf8View] to the signature OneOf([Exact([Int8, Int8, Float64]), 
Exact([Int16, Int16, Float64]), Exact([Int32, Int32, Float64]), Exact([Int64, 
Int64, Float64]), Exact([UInt8, UInt8, Float64]), Exact([UInt16, UInt16, 
Float64]), Exact([UInt32, UInt32, Float64]), Exact([UInt64, UInt64, Float64]), 
Exact([Float32, Float32, Float64]) [...]
+
 
 # csv_query_approx_percentile_cont_with_histogram_bins
 statement error DataFusion error: This feature is not implemented: Tdigest 
max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data 
type Int64\)\.
 SELECT c1, approx_percentile_cont(0.95, -1000) WITHIN GROUP (ORDER BY c3) AS 
c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
 
-statement error DataFusion error: Error during planning: Failed to coerce 
arguments to satisfy a call to 'approx_percentile_cont' function: coercion from 
\[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)*
+statement error
 SELECT approx_percentile_cont(0.95, c1) WITHIN GROUP (ORDER BY c3) FROM 
aggregate_test_100
+----
+DataFusion error: Error during planning: Failed to coerce arguments to satisfy 
a call to 'approx_percentile_cont' function: coercion from [Int16, Float64, 
Utf8View] to the signature OneOf([Exact([Int8, Float64]), Exact([Int8, Float64, 
Int8]), Exact([Int8, Float64, Int16]), Exact([Int8, Float64, Int32]), 
Exact([Int8, Float64, Int64]), Exact([Int8, Float64, UInt8]), Exact([Int8, 
Float64, UInt16]), Exact([Int8, Float64, UInt32]), Exact([Int8, Float64, 
UInt64]), Exact([Int16, Float64]), Exac [...]
+
+
 
 statement error DataFusion error: Error during planning: Failed to coerce 
arguments to satisfy a call to 'approx_percentile_cont' function: coercion from 
\[Int16, Float64, Float64\] to the signature OneOf(.*) failed(.|\n)*
 SELECT approx_percentile_cont(0.95, 111.1) WITHIN GROUP (ORDER BY c3) FROM 
aggregate_test_100
diff --git a/datafusion/sqllogictest/test_files/avro.slt 
b/datafusion/sqllogictest/test_files/avro.slt
index 6cfebfe5b6..4573af1d59 100644
--- a/datafusion/sqllogictest/test_files/avro.slt
+++ b/datafusion/sqllogictest/test_files/avro.slt
@@ -15,6 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# Currently, the avro not support Utf8View type, so we disable the 
map_varchar_to_utf8view
+# After https://github.com/apache/arrow-rs/issues/7262 released, we can remove 
this setting
+statement ok
+set datafusion.sql_parser.map_varchar_to_utf8view = false;
 
 statement ok
 CREATE EXTERNAL TABLE alltypes_plain (
diff --git a/datafusion/sqllogictest/test_files/coalesce.slt 
b/datafusion/sqllogictest/test_files/coalesce.slt
index e7cf31dc69..9740bade5e 100644
--- a/datafusion/sqllogictest/test_files/coalesce.slt
+++ b/datafusion/sqllogictest/test_files/coalesce.slt
@@ -260,8 +260,8 @@ select
   arrow_typeof(coalesce(c, arrow_cast('b', 'Dictionary(Int32, Utf8)')))
 from t;
 ----
-a Dictionary(Int32, Utf8)
-b Dictionary(Int32, Utf8)
+a Utf8View
+b Utf8View
 
 statement ok
 drop table t;
diff --git a/datafusion/sqllogictest/test_files/ddl.slt 
b/datafusion/sqllogictest/test_files/ddl.slt
index 088d0155a6..1e95e426f3 100644
--- a/datafusion/sqllogictest/test_files/ddl.slt
+++ b/datafusion/sqllogictest/test_files/ddl.slt
@@ -819,7 +819,7 @@ show columns FROM table_with_pk;
 ----
 datafusion public table_with_pk sn Int32 NO
 datafusion public table_with_pk ts Timestamp(Nanosecond, Some("+00:00")) NO
-datafusion public table_with_pk currency Utf8 NO
+datafusion public table_with_pk currency Utf8View NO
 datafusion public table_with_pk amount Float32 YES
 
 statement ok
@@ -835,8 +835,8 @@ CREATE TABLE t1(c1 VARCHAR(10) NOT NULL, c2 VARCHAR);
 query TTT
 DESCRIBE t1;
 ----
-c1 Utf8 NO
-c2 Utf8 YES
+c1 Utf8View NO
+c2 Utf8View YES
 
 statement ok
 set datafusion.sql_parser.map_varchar_to_utf8view = true;
diff --git a/datafusion/sqllogictest/test_files/delete.slt 
b/datafusion/sqllogictest/test_files/delete.slt
index d096aa9f43..258318f094 100644
--- a/datafusion/sqllogictest/test_files/delete.slt
+++ b/datafusion/sqllogictest/test_files/delete.slt
@@ -37,13 +37,13 @@ logical_plan
 physical_plan_error This feature is not implemented: Unsupported logical plan: 
Dml(Delete)
 
 
-# Filtered by existing columns 
+# Filtered by existing columns
 query TT
 explain delete from t1 where a = 1 and b = 2 and c > 3 and d != 4;
 ----
 logical_plan
 01)Dml: op=[Delete] table=[t1]
-02)--Filter: CAST(t1.a AS Int64) = Int64(1) AND t1.b = CAST(Int64(2) AS Utf8) 
AND t1.c > CAST(Int64(3) AS Float64) AND CAST(t1.d AS Int64) != Int64(4)
+02)--Filter: CAST(t1.a AS Int64) = Int64(1) AND t1.b = CAST(Int64(2) AS 
Utf8View) AND t1.c > CAST(Int64(3) AS Float64) AND CAST(t1.d AS Int64) != 
Int64(4)
 03)----TableScan: t1
 physical_plan_error This feature is not implemented: Unsupported logical plan: 
Dml(Delete)
 
@@ -54,7 +54,7 @@ explain delete from t1 where t1.a = 1 and b = 2 and t1.c > 3 
and d != 4;
 ----
 logical_plan
 01)Dml: op=[Delete] table=[t1]
-02)--Filter: CAST(t1.a AS Int64) = Int64(1) AND t1.b = CAST(Int64(2) AS Utf8) 
AND t1.c > CAST(Int64(3) AS Float64) AND CAST(t1.d AS Int64) != Int64(4)
+02)--Filter: CAST(t1.a AS Int64) = Int64(1) AND t1.b = CAST(Int64(2) AS 
Utf8View) AND t1.c > CAST(Int64(3) AS Float64) AND CAST(t1.d AS Int64) != 
Int64(4)
 03)----TableScan: t1
 physical_plan_error This feature is not implemented: Unsupported logical plan: 
Dml(Delete)
 
diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt 
b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
index d96044fda8..a09d8ce26d 100644
--- a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
+++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
@@ -34,7 +34,7 @@ ORDER BY "date", "time";
 ----
 logical_plan
 01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
-02)--Filter: data.ticker = Utf8("A")
+02)--Filter: data.ticker = Utf8View("A")
 03)----TableScan: data projection=[date, ticker, time]
 physical_plan
 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST, time@2 ASC NULLS LAST]
@@ -51,7 +51,7 @@ ORDER BY "time"
 ----
 logical_plan
 01)Sort: data.time ASC NULLS LAST
-02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = 
data.date
 03)----TableScan: data projection=[date, ticker, time]
 physical_plan
 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST]
@@ -68,7 +68,7 @@ ORDER BY "date"
 ----
 logical_plan
 01)Sort: data.date ASC NULLS LAST
-02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = 
data.date
 03)----TableScan: data projection=[date, ticker, time]
 physical_plan
 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST]
@@ -85,7 +85,7 @@ ORDER BY "ticker"
 ----
 logical_plan
 01)Sort: data.ticker ASC NULLS LAST
-02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = 
data.date
 03)----TableScan: data projection=[date, ticker, time]
 physical_plan
 01)CoalescePartitionsExec
@@ -102,7 +102,7 @@ ORDER BY "time", "date";
 ----
 logical_plan
 01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST
-02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = 
data.date
 03)----TableScan: data projection=[date, ticker, time]
 physical_plan
 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST, date@0 ASC NULLS LAST]
@@ -120,7 +120,7 @@ ORDER BY "time"
 ----
 logical_plan
 01)Sort: data.time ASC NULLS LAST
-02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) != data.date
+02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) != 
data.date
 03)----TableScan: data projection=[date, ticker, time]
 
 # no relation between time & date
@@ -132,7 +132,7 @@ ORDER BY "time"
 ----
 logical_plan
 01)Sort: data.time ASC NULLS LAST
-02)--Filter: data.ticker = Utf8("A")
+02)--Filter: data.ticker = Utf8View("A")
 03)----TableScan: data projection=[date, ticker, time]
 
 # query
diff --git a/datafusion/sqllogictest/test_files/imdb.slt 
b/datafusion/sqllogictest/test_files/imdb.slt
index 412e15f680..c17f9c47c7 100644
--- a/datafusion/sqllogictest/test_files/imdb.slt
+++ b/datafusion/sqllogictest/test_files/imdb.slt
@@ -1339,7 +1339,7 @@ WHERE k.keyword  like '%sequel%'
 ----
 Avengers: Endgame
 
-# 4a - Query with certain actor names 
+# 4a - Query with certain actor names
 query TT
 SELECT MIN(mi_idx.info) AS rating, MIN(t.title) AS movie_title
 FROM info_type AS it, keyword AS k, movie_info_idx AS mi_idx, movie_keyword AS 
mk, title AS t 
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index 841b289e75..108c844f20 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -307,7 +307,7 @@ datafusion.sql_parser.collect_spans false
 datafusion.sql_parser.dialect generic
 datafusion.sql_parser.enable_ident_normalization true
 datafusion.sql_parser.enable_options_value_normalization false
-datafusion.sql_parser.map_varchar_to_utf8view false
+datafusion.sql_parser.map_varchar_to_utf8view true
 datafusion.sql_parser.parse_float_as_decimal false
 datafusion.sql_parser.recursion_limit 50
 datafusion.sql_parser.support_varchar_with_length true
@@ -417,7 +417,7 @@ datafusion.sql_parser.collect_spans false When set to true, 
the source locations
 datafusion.sql_parser.dialect generic Configure the SQL dialect used by 
DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, 
Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB 
and Databricks.
 datafusion.sql_parser.enable_ident_normalization true When set to true, SQL 
parser will normalize ident (convert ident to lowercase when not quoted)
 datafusion.sql_parser.enable_options_value_normalization false When set to 
true, SQL parser will normalize options value (convert value to lowercase). 
Note that this option is ignored and will be removed in the future. All 
case-insensitive values are normalized automatically.
-datafusion.sql_parser.map_varchar_to_utf8view false If true, `VARCHAR` is 
mapped to `Utf8View` during SQL planning. If false, `VARCHAR` is mapped to 
`Utf8`  during SQL planning. Default is false.
+datafusion.sql_parser.map_varchar_to_utf8view true If true, `VARCHAR` is 
mapped to `Utf8View` during SQL planning. If false, `VARCHAR` is mapped to 
`Utf8`  during SQL planning. Default is false.
 datafusion.sql_parser.parse_float_as_decimal false When set to true, SQL 
parser will parse float as decimal type
 datafusion.sql_parser.recursion_limit 50 Specifies the recursion depth limit 
when parsing complex SQL Queries
 datafusion.sql_parser.support_varchar_with_length true If true, permit lengths 
for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if 
a `VARCHAR` with a length is specified. The Arrow type system does not have a 
notion of maximum string length and thus DataFusion can not enforce such limits.
diff --git a/datafusion/sqllogictest/test_files/join.slt.part 
b/datafusion/sqllogictest/test_files/join.slt.part
index 4c21b8a27d..19763ab008 100644
--- a/datafusion/sqllogictest/test_files/join.slt.part
+++ b/datafusion/sqllogictest/test_files/join.slt.part
@@ -842,7 +842,7 @@ LEFT JOIN department AS d
 ON (e.name = 'Alice' OR e.name = 'Bob');
 ----
 logical_plan
-01)Left Join:  Filter: e.name = Utf8("Alice") OR e.name = Utf8("Bob")
+01)Left Join:  Filter: e.name = Utf8View("Alice") OR e.name = Utf8View("Bob")
 02)--SubqueryAlias: e
 03)----TableScan: employees projection=[emp_id, name]
 04)--SubqueryAlias: d
@@ -929,7 +929,7 @@ ON (e.name = 'Alice' OR e.name = 'Bob');
 logical_plan
 01)Cross Join: 
 02)--SubqueryAlias: e
-03)----Filter: employees.name = Utf8("Alice") OR employees.name = Utf8("Bob")
+03)----Filter: employees.name = Utf8View("Alice") OR employees.name = 
Utf8View("Bob")
 04)------TableScan: employees projection=[emp_id, name]
 05)--SubqueryAlias: d
 06)----TableScan: department projection=[dept_name]
@@ -974,11 +974,11 @@ ON e.emp_id = d.emp_id
 WHERE ((dept_name != 'Engineering' AND e.name = 'Alice') OR (name != 'Alice' 
AND e.name = 'Carol'));
 ----
 logical_plan
-01)Filter: d.dept_name != Utf8("Engineering") AND e.name = Utf8("Alice") OR 
e.name != Utf8("Alice") AND e.name = Utf8("Carol")
+01)Filter: d.dept_name != Utf8View("Engineering") AND e.name = 
Utf8View("Alice") OR e.name != Utf8View("Alice") AND e.name = Utf8View("Carol")
 02)--Projection: e.emp_id, e.name, d.dept_name
 03)----Left Join: e.emp_id = d.emp_id
 04)------SubqueryAlias: e
-05)--------Filter: employees.name = Utf8("Alice") OR employees.name != 
Utf8("Alice") AND employees.name = Utf8("Carol")
+05)--------Filter: employees.name = Utf8View("Alice") OR employees.name != 
Utf8View("Alice") AND employees.name = Utf8View("Carol")
 06)----------TableScan: employees projection=[emp_id, name]
 07)------SubqueryAlias: d
 08)--------TableScan: department projection=[emp_id, dept_name]
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index b5189d16ec..ccecb94943 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1067,9 +1067,9 @@ LEFT JOIN join_t2 on join_t1.t1_id = join_t2.t2_id
 WHERE join_t2.t2_int < 10 or (join_t1.t1_int > 2 and join_t2.t2_name != 'w')
 ----
 logical_plan
-01)Inner Join: join_t1.t1_id = join_t2.t2_id Filter: join_t2.t2_int < 
UInt32(10) OR join_t1.t1_int > UInt32(2) AND join_t2.t2_name != Utf8("w")
+01)Inner Join: join_t1.t1_id = join_t2.t2_id Filter: join_t2.t2_int < 
UInt32(10) OR join_t1.t1_int > UInt32(2) AND join_t2.t2_name != Utf8View("w")
 02)--TableScan: join_t1 projection=[t1_id, t1_name, t1_int]
-03)--Filter: join_t2.t2_int < UInt32(10) OR join_t2.t2_name != Utf8("w")
+03)--Filter: join_t2.t2_int < UInt32(10) OR join_t2.t2_name != Utf8View("w")
 04)----TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
 
 # Reduce left join 3 (to inner join)
@@ -1153,7 +1153,7 @@ WHERE join_t1.t1_name != 'b'
 ----
 logical_plan
 01)Left Join: join_t1.t1_id = join_t2.t2_id
-02)--Filter: join_t1.t1_name != Utf8("b")
+02)--Filter: join_t1.t1_name != Utf8View("b")
 03)----TableScan: join_t1 projection=[t1_id, t1_name, t1_int]
 04)--TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
 
@@ -1168,9 +1168,9 @@ WHERE join_t1.t1_name != 'b' and join_t2.t2_name = 'x'
 ----
 logical_plan
 01)Inner Join: join_t1.t1_id = join_t2.t2_id
-02)--Filter: join_t1.t1_name != Utf8("b")
+02)--Filter: join_t1.t1_name != Utf8View("b")
 03)----TableScan: join_t1 projection=[t1_id, t1_name, t1_int]
-04)--Filter: join_t2.t2_name = Utf8("x")
+04)--Filter: join_t2.t2_name = Utf8View("x")
 05)----TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
 
 ###
@@ -4087,7 +4087,7 @@ logical_plan
 07)------------TableScan: sales_global projection=[ts, sn, amount, currency]
 08)----------SubqueryAlias: e
 09)------------Projection: exchange_rates.ts, exchange_rates.currency_from, 
exchange_rates.rate
-10)--------------Filter: exchange_rates.currency_to = Utf8("USD")
+10)--------------Filter: exchange_rates.currency_to = Utf8View("USD")
 11)----------------TableScan: exchange_rates projection=[ts, currency_from, 
currency_to, rate]
 physical_plan
 01)SortExec: expr=[sn@1 ASC NULLS LAST], preserve_partitioning=[false]
@@ -4644,7 +4644,7 @@ logical_plan
 08)----Subquery:
 09)------Filter: j3.j3_string = outer_ref(j2.j2_string)
 10)--------TableScan: j3 projection=[j3_string, j3_id]
-physical_plan_error This feature is not implemented: Physical plan does not 
support logical expression OuterReferenceColumn(Utf8, Column { relation: 
Some(Bare { table: "j2" }), name: "j2_string" })
+physical_plan_error This feature is not implemented: Physical plan does not 
support logical expression OuterReferenceColumn(Utf8View, Column { relation: 
Some(Bare { table: "j2" }), name: "j2_string" })
 
 query TT
 explain SELECT * FROM j1, LATERAL (SELECT * FROM j1, LATERAL (SELECT * FROM j2 
WHERE j1_id = j2_id) as j2) as j2;
diff --git a/datafusion/sqllogictest/test_files/min_max/fixed_size_list.slt 
b/datafusion/sqllogictest/test_files/min_max/fixed_size_list.slt
index 164daec228..aa623b63cd 100644
--- a/datafusion/sqllogictest/test_files/min_max/fixed_size_list.slt
+++ b/datafusion/sqllogictest/test_files/min_max/fixed_size_list.slt
@@ -131,4 +131,3 @@ SELECT max(column2) OVER (ORDER BY column1 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOW
 ----
 [4, 5]
 [4, 5]
-
diff --git a/datafusion/sqllogictest/test_files/min_max/large_list.slt 
b/datafusion/sqllogictest/test_files/min_max/large_list.slt
index 0dd7b5631b..44789e9dd7 100644
--- a/datafusion/sqllogictest/test_files/min_max/large_list.slt
+++ b/datafusion/sqllogictest/test_files/min_max/large_list.slt
@@ -141,8 +141,3 @@ create table max_window_different_column as (select
 from max_base_window_different_column);
 
 include ./queries.slt.part
-
-
-
-
-
diff --git a/datafusion/sqllogictest/test_files/min_max/list.slt 
b/datafusion/sqllogictest/test_files/min_max/list.slt
index da56b1a8ea..e63e8303c7 100644
--- a/datafusion/sqllogictest/test_files/min_max/list.slt
+++ b/datafusion/sqllogictest/test_files/min_max/list.slt
@@ -130,4 +130,3 @@ create table max_window_different_column as (
 ;
 
 include ./queries.slt.part
-
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index c8b0c379cc..3fc90a6459 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -1040,12 +1040,12 @@ limit 5;
 ----
 logical_plan
 01)Sort: c_str ASC NULLS LAST, fetch=5
-02)--Projection: CAST(ordered_table.c AS Utf8) AS c_str
+02)--Projection: CAST(ordered_table.c AS Utf8View) AS c_str
 03)----TableScan: ordered_table projection=[c]
 physical_plan
 01)SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5
 02)--SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST], 
preserve_partitioning=[true]
-03)----ProjectionExec: expr=[CAST(c@0 AS Utf8) as c_str]
+03)----ProjectionExec: expr=[CAST(c@0 AS Utf8View) as c_str]
 04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 05)--------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true
 
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index 01e0ad2fee..1b6ae13fbe 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -148,8 +148,8 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by b;
 logical_plan
 01)Sort: t_pushdown.b ASC NULLS LAST
 02)--Projection: t_pushdown.b
-03)----Filter: t_pushdown.a = Utf8("bar")
-04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.a = Utf8("bar")]
+03)----Filter: t_pushdown.a = Utf8View("bar")
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.a = Utf8View("bar")]
 physical_plan
 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST]
 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
diff --git a/datafusion/sqllogictest/test_files/predicates.slt 
b/datafusion/sqllogictest/test_files/predicates.slt
index b263e39f3b..b4b31fa78a 100644
--- a/datafusion/sqllogictest/test_files/predicates.slt
+++ b/datafusion/sqllogictest/test_files/predicates.slt
@@ -662,11 +662,11 @@ OR
 ----
 logical_plan
 01)Projection: lineitem.l_partkey
-02)--Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = 
Utf8("Brand#12") AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND 
lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) 
OR part.p_brand = Utf8("Brand#23") AND lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = 
Utf8("Brand#34") AND lineitem.l_quantity >= Decimal128(Some(2000),15,2 [...]
+02)--Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = 
Utf8View("Brand#12") AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND 
lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) 
OR part.p_brand = Utf8View("Brand#23") AND lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = 
Utf8View("Brand#34") AND lineitem.l_quantity >= Decimal128(Som [...]
 03)----Filter: lineitem.l_quantity >= Decimal128(Some(100),15,2) AND 
lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(3000),15,2)
 04)------TableScan: lineitem projection=[l_partkey, l_quantity], 
partial_filters=[lineitem.l_quantity >= Decimal128(Some(100),15,2) AND 
lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(3000),15,2)]
-05)----Filter: (part.p_brand = Utf8("Brand#12") AND part.p_size <= Int32(5) OR 
part.p_brand = Utf8("Brand#23") AND part.p_size <= Int32(10) OR part.p_brand = 
Utf8("Brand#34") AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1)
-06)------TableScan: part projection=[p_partkey, p_brand, p_size], 
partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8("Brand#12") AND 
part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_size <= 
Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_size <= Int32(15)]
+05)----Filter: (part.p_brand = Utf8View("Brand#12") AND part.p_size <= 
Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_size <= Int32(10) OR 
part.p_brand = Utf8View("Brand#34") AND part.p_size <= Int32(15)) AND 
part.p_size >= Int32(1)
+06)------TableScan: part projection=[p_partkey, p_brand, p_size], 
partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8View("Brand#12") 
AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND 
part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_size 
<= Int32(15)]
 physical_plan
 01)CoalesceBatchesExec: target_batch_size=8192
 02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, 
p_partkey@0)], filter=p_brand@1 = Brand#12 AND l_quantity@0 >= Some(100),15,2 
AND l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 
AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND 
p_size@2 <= 10 OR p_brand@1 = Brand#34 AND l_quantity@0 >= Some(2000),15,2 AND 
l_quantity@0 <= Some(3000),15,2 AND p_size@2 <= 15, projection=[l_partkey@0]
@@ -755,8 +755,8 @@ logical_plan
 05)--------Inner Join: lineitem.l_partkey = part.p_partkey
 06)----------TableScan: lineitem projection=[l_partkey, l_extendedprice, 
l_discount]
 07)----------Projection: part.p_partkey
-08)------------Filter: part.p_brand = Utf8("Brand#12") OR part.p_brand = 
Utf8("Brand#23")
-09)--------------TableScan: part projection=[p_partkey, p_brand], 
partial_filters=[part.p_brand = Utf8("Brand#12") OR part.p_brand = 
Utf8("Brand#23")]
+08)------------Filter: part.p_brand = Utf8View("Brand#12") OR part.p_brand = 
Utf8View("Brand#23")
+09)--------------TableScan: part projection=[p_partkey, p_brand], 
partial_filters=[part.p_brand = Utf8View("Brand#12") OR part.p_brand = 
Utf8View("Brand#23")]
 10)------TableScan: partsupp projection=[ps_partkey, ps_suppkey]
 physical_plan
 01)AggregateExec: mode=SinglePartitioned, gby=[p_partkey@2 as p_partkey], 
aggr=[sum(lineitem.l_extendedprice), avg(lineitem.l_discount), count(DISTINCT 
partsupp.ps_suppkey)]
diff --git a/datafusion/sqllogictest/test_files/struct.slt 
b/datafusion/sqllogictest/test_files/struct.slt
index 2074dfac31..46e15a4d6d 100644
--- a/datafusion/sqllogictest/test_files/struct.slt
+++ b/datafusion/sqllogictest/test_files/struct.slt
@@ -53,9 +53,9 @@ select * from struct_values;
 query TT
 select arrow_typeof(s1), arrow_typeof(s2) from struct_values;
 ----
-Struct(c0 Int32) Struct(a Int32, b Utf8)
-Struct(c0 Int32) Struct(a Int32, b Utf8)
-Struct(c0 Int32) Struct(a Int32, b Utf8)
+Struct(c0 Int32) Struct(a Int32, b Utf8View)
+Struct(c0 Int32) Struct(a Int32, b Utf8View)
+Struct(c0 Int32) Struct(a Int32, b Utf8View)
 
 
 # struct[i]
@@ -392,7 +392,7 @@ create table t(a struct<r varchar, c int>, b struct<r 
varchar, c float>) as valu
 query T
 select arrow_typeof([a, b]) from t;
 ----
-List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: 
Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field 
{ name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} })
+List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: 
Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} })
 
 query ?
 select [a, b] from t;
@@ -443,12 +443,12 @@ select * from t;
 query T
 select arrow_typeof(c1) from t;
 ----
-Struct(r Utf8, b Int32)
+Struct(r Utf8View, b Int32)
 
 query T
 select arrow_typeof(c2) from t;
 ----
-Struct(r Utf8, b Float32)
+Struct(r Utf8View, b Float32)
 
 statement ok
 drop table t;
@@ -498,9 +498,9 @@ select coalesce(s1) from t;
 query T
 select arrow_typeof(coalesce(s1, s2)) from t;
 ----
-Struct(a Float32, b Utf8)
-Struct(a Float32, b Utf8)
-Struct(a Float32, b Utf8)
+Struct(a Float32, b Utf8View)
+Struct(a Float32, b Utf8View)
+Struct(a Float32, b Utf8View)
 
 statement ok
 drop table t;
@@ -525,9 +525,9 @@ select coalesce(s1, s2) from t;
 query T
 select arrow_typeof(coalesce(s1, s2)) from t;
 ----
-Struct(a Float32, b Utf8)
-Struct(a Float32, b Utf8)
-Struct(a Float32, b Utf8)
+Struct(a Float32, b Utf8View)
+Struct(a Float32, b Utf8View)
+Struct(a Float32, b Utf8View)
 
 statement ok
 drop table t;
@@ -562,7 +562,7 @@ create table t(a struct(r varchar, c int), b struct(r 
varchar, c float)) as valu
 query T
 select arrow_typeof([a, b]) from t;
 ----
-List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: 
Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field 
{ name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} })
+List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: 
Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} })
 
 statement ok
 drop table t;
@@ -585,13 +585,13 @@ create table t(a struct(r varchar, c int, g float), b 
struct(r varchar, c float,
 query T
 select arrow_typeof(a) from t;
 ----
-Struct(r Utf8, c Int32, g Float32)
+Struct(r Utf8View, c Int32, g Float32)
 
 # type of each column should not coerced but perserve as it is
 query T
 select arrow_typeof(b) from t;
 ----
-Struct(r Utf8, c Float32, g Int32)
+Struct(r Utf8View, c Float32, g Int32)
 
 statement ok
 drop table t;
diff --git a/datafusion/sqllogictest/test_files/subquery_sort.slt 
b/datafusion/sqllogictest/test_files/subquery_sort.slt
index 5d22bf92e7..d993515f4d 100644
--- a/datafusion/sqllogictest/test_files/subquery_sort.slt
+++ b/datafusion/sqllogictest/test_files/subquery_sort.slt
@@ -100,7 +100,7 @@ physical_plan
 01)ProjectionExec: expr=[c1@0 as c1, r@1 as r]
 02)--SortExec: TopK(fetch=2), expr=[c1@0 ASC NULLS LAST, c3@2 ASC NULLS LAST, 
c9@3 ASC NULLS LAST], preserve_partitioning=[false]
 03)----ProjectionExec: expr=[c1@0 as c1, rank() ORDER BY [sink_table.c1 DESC 
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as r, c3@1 as 
c3, c9@2 as c9]
-04)------BoundedWindowAggExec: wdw=[rank() ORDER BY [sink_table.c1 DESC NULLS 
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"rank() ORDER BY [sink_table.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Utf8(NULL)), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]
+04)------BoundedWindowAggExec: wdw=[rank() ORDER BY [sink_table.c1 DESC NULLS 
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"rank() ORDER BY [sink_table.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Utf8View(NULL)), end_bound: CurrentRow, is_causal: false 
}], mode=[Sorted]
 05)--------SortExec: expr=[c1@0 DESC], preserve_partitioning=[false]
 06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c3, c9], file_type=csv, has_header=true
 
@@ -127,9 +127,8 @@ physical_plan
 02)--SortExec: TopK(fetch=2), expr=[c1@0 ASC NULLS LAST, c3@2 ASC NULLS LAST, 
c9@3 ASC NULLS LAST], preserve_partitioning=[false]
 03)----ProjectionExec: expr=[c1@0 as c1, rank() ORDER BY 
[sink_table_with_utf8view.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@3 as r, c3@1 as c3, c9@2 as c9]
 04)------BoundedWindowAggExec: wdw=[rank() ORDER BY 
[sink_table_with_utf8view.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW: Ok(Field { name: "rank() ORDER BY 
[sink_table_with_utf8view.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Utf8View(NULL)), end_bound: CurrentRow, is_causal: false 
}], m [...]
-05)--------SortPreservingMergeExec: [c1@0 DESC]
-06)----------SortExec: expr=[c1@0 DESC], preserve_partitioning=[true]
-07)------------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+05)--------SortExec: expr=[c1@0 DESC], preserve_partitioning=[false]
+06)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 statement ok
 DROP TABLE sink_table_with_utf8view;
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part
index fee496f920..04de9153a0 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part
@@ -65,8 +65,8 @@ logical_plan
 12)--------------------Filter: orders.o_orderdate >= Date32("1993-10-01") AND 
orders.o_orderdate < Date32("1994-01-01")
 13)----------------------TableScan: orders projection=[o_orderkey, o_custkey, 
o_orderdate], partial_filters=[orders.o_orderdate >= Date32("1993-10-01"), 
orders.o_orderdate < Date32("1994-01-01")]
 14)--------------Projection: lineitem.l_orderkey, lineitem.l_extendedprice, 
lineitem.l_discount
-15)----------------Filter: lineitem.l_returnflag = Utf8("R")
-16)------------------TableScan: lineitem projection=[l_orderkey, 
l_extendedprice, l_discount, l_returnflag], 
partial_filters=[lineitem.l_returnflag = Utf8("R")]
+15)----------------Filter: lineitem.l_returnflag = Utf8View("R")
+16)------------------TableScan: lineitem projection=[l_orderkey, 
l_extendedprice, l_discount, l_returnflag], 
partial_filters=[lineitem.l_returnflag = Utf8View("R")]
 17)----------TableScan: nation projection=[n_nationkey, n_name]
 physical_plan
 01)SortPreservingMergeExec: [revenue@2 DESC], fetch=10
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part
index 1dba8c0537..a6225daae4 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part
@@ -58,8 +58,8 @@ logical_plan
 09)----------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_availqty, ps_supplycost], partial_filters=[Boolean(true)]
 10)----------------TableScan: supplier projection=[s_suppkey, s_nationkey]
 11)------------Projection: nation.n_nationkey
-12)--------------Filter: nation.n_name = Utf8("GERMANY")
-13)----------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("GERMANY")]
+12)--------------Filter: nation.n_name = Utf8View("GERMANY")
+13)----------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("GERMANY")]
 14)------SubqueryAlias: __scalar_sq_1
 15)--------Projection: CAST(CAST(sum(partsupp.ps_supplycost * 
partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15))
 16)----------Aggregate: groupBy=[[]], aggr=[[sum(partsupp.ps_supplycost * 
CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
@@ -70,8 +70,8 @@ logical_plan
 21)--------------------TableScan: partsupp projection=[ps_suppkey, 
ps_availqty, ps_supplycost]
 22)--------------------TableScan: supplier projection=[s_suppkey, s_nationkey]
 23)----------------Projection: nation.n_nationkey
-24)------------------Filter: nation.n_name = Utf8("GERMANY")
-25)--------------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("GERMANY")]
+24)------------------Filter: nation.n_name = Utf8View("GERMANY")
+25)--------------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("GERMANY")]
 physical_plan
 01)SortExec: TopK(fetch=10), expr=[value@1 DESC], preserve_partitioning=[false]
 02)--ProjectionExec: expr=[ps_partkey@0 as ps_partkey, 
sum(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q12.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q12.slt.part
index 3757fc48db..f7344daed8 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q12.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q12.slt.part
@@ -51,12 +51,12 @@ order by
 logical_plan
 01)Sort: lineitem.l_shipmode ASC NULLS LAST
 02)--Projection: lineitem.l_shipmode, sum(CASE WHEN orders.o_orderpriority = 
Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE 
Int64(0) END) AS high_line_count, sum(CASE WHEN orders.o_orderpriority != 
Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) 
ELSE Int64(0) END) AS low_line_count
-03)----Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN 
orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = 
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN 
orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != 
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
+03)----Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN 
orders.o_orderpriority = Utf8View("1-URGENT") OR orders.o_orderpriority = 
Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHEN 
orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = 
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN 
orders.o_orderpriority != Utf8View("1-URGENT") AND orders.o_orderpriority != 
Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHE [...]
 04)------Projection: lineitem.l_shipmode, orders.o_orderpriority
 05)--------Inner Join: lineitem.l_orderkey = orders.o_orderkey
 06)----------Projection: lineitem.l_orderkey, lineitem.l_shipmode
-07)------------Filter: (lineitem.l_shipmode = Utf8("MAIL") OR 
lineitem.l_shipmode = Utf8("SHIP")) AND lineitem.l_receiptdate > 
lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND 
lineitem.l_receiptdate >= Date32("1994-01-01") AND lineitem.l_receiptdate < 
Date32("1995-01-01")
-08)--------------TableScan: lineitem projection=[l_orderkey, l_shipdate, 
l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode 
= Utf8("MAIL") OR lineitem.l_shipmode = Utf8("SHIP"), lineitem.l_receiptdate > 
lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate, 
lineitem.l_receiptdate >= Date32("1994-01-01"), lineitem.l_receiptdate < 
Date32("1995-01-01")]
+07)------------Filter: (lineitem.l_shipmode = Utf8View("MAIL") OR 
lineitem.l_shipmode = Utf8View("SHIP")) AND lineitem.l_receiptdate > 
lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND 
lineitem.l_receiptdate >= Date32("1994-01-01") AND lineitem.l_receiptdate < 
Date32("1995-01-01")
+08)--------------TableScan: lineitem projection=[l_orderkey, l_shipdate, 
l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode 
= Utf8View("MAIL") OR lineitem.l_shipmode = Utf8View("SHIP"), 
lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate < 
lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("1994-01-01"), 
lineitem.l_receiptdate < Date32("1995-01-01")]
 09)----------TableScan: orders projection=[o_orderkey, o_orderpriority]
 physical_plan
 01)SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
index e9d9cf141d..96f3bd6edf 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
@@ -50,8 +50,8 @@ logical_plan
 08)--------------Left Join: customer.c_custkey = orders.o_custkey
 09)----------------TableScan: customer projection=[c_custkey]
 10)----------------Projection: orders.o_orderkey, orders.o_custkey
-11)------------------Filter: orders.o_comment NOT LIKE 
Utf8("%special%requests%")
-12)--------------------TableScan: orders projection=[o_orderkey, o_custkey, 
o_comment], partial_filters=[orders.o_comment NOT LIKE 
Utf8("%special%requests%")]
+11)------------------Filter: orders.o_comment NOT LIKE 
Utf8View("%special%requests%")
+12)--------------------TableScan: orders projection=[o_orderkey, o_custkey, 
o_comment], partial_filters=[orders.o_comment NOT LIKE 
Utf8View("%special%requests%")]
 physical_plan
 01)SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC], fetch=10
 02)--SortExec: TopK(fetch=10), expr=[custdist@1 DESC, c_count@0 DESC], 
preserve_partitioning=[true]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part
index 1104af2bdc..8d8dd68c3d 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part
@@ -33,7 +33,7 @@ where
 ----
 logical_plan
 01)Projection: Float64(100) * CAST(sum(CASE WHEN part.p_type LIKE 
Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount 
ELSE Int64(0) END) AS Float64) / CAST(sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount) AS Float64) AS promo_revenue
-02)--Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN part.p_type LIKE 
Utf8("PROMO%") THEN __common_expr_1 ELSE Decimal128(Some(0),38,4) END) AS 
sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) AS 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+02)--Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN part.p_type LIKE 
Utf8View("PROMO%") THEN __common_expr_1 ELSE Decimal128(Some(0),38,4) END) AS 
sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) AS 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
 03)----Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS __common_expr_1, part.p_type
 04)------Inner Join: lineitem.l_partkey = part.p_partkey
 05)--------Projection: lineitem.l_partkey, lineitem.l_extendedprice, 
lineitem.l_discount
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part
index c648f164c8..edc452284c 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part
@@ -58,12 +58,12 @@ logical_plan
 06)----------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, 
part.p_size
 07)------------Inner Join: partsupp.ps_partkey = part.p_partkey
 08)--------------TableScan: partsupp projection=[ps_partkey, ps_suppkey]
-09)--------------Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT 
LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), 
Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
-10)----------------TableScan: part projection=[p_partkey, p_brand, p_type, 
p_size], partial_filters=[part.p_brand != Utf8("Brand#45"), part.p_type NOT 
LIKE Utf8("MEDIUM POLISHED%"), part.p_size IN ([Int32(49), Int32(14), 
Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])]
+09)--------------Filter: part.p_brand != Utf8View("Brand#45") AND part.p_type 
NOT LIKE Utf8View("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), 
Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
+10)----------------TableScan: part projection=[p_partkey, p_brand, p_type, 
p_size], partial_filters=[part.p_brand != Utf8View("Brand#45"), part.p_type NOT 
LIKE Utf8View("MEDIUM POLISHED%"), part.p_size IN ([Int32(49), Int32(14), 
Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])]
 11)----------SubqueryAlias: __correlated_sq_1
 12)------------Projection: supplier.s_suppkey
-13)--------------Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
-14)----------------TableScan: supplier projection=[s_suppkey, s_comment], 
partial_filters=[supplier.s_comment LIKE Utf8("%Customer%Complaints%")]
+13)--------------Filter: supplier.s_comment LIKE 
Utf8View("%Customer%Complaints%")
+14)----------------TableScan: supplier projection=[s_suppkey, s_comment], 
partial_filters=[supplier.s_comment LIKE Utf8View("%Customer%Complaints%")]
 physical_plan
 01)SortPreservingMergeExec: [supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, 
p_type@1 ASC NULLS LAST, p_size@2 ASC NULLS LAST], fetch=10
 02)--SortExec: TopK(fetch=10), expr=[supplier_cnt@3 DESC, p_brand@0 ASC NULLS 
LAST, p_type@1 ASC NULLS LAST, p_size@2 ASC NULLS LAST], 
preserve_partitioning=[true]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part
index 02553890bc..51a0d09642 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part
@@ -44,8 +44,8 @@ logical_plan
 06)----------Inner Join: lineitem.l_partkey = part.p_partkey
 07)------------TableScan: lineitem projection=[l_partkey, l_quantity, 
l_extendedprice]
 08)------------Projection: part.p_partkey
-09)--------------Filter: part.p_brand = Utf8("Brand#23") AND part.p_container 
= Utf8("MED BOX")
-10)----------------TableScan: part projection=[p_partkey, p_brand, 
p_container], partial_filters=[part.p_brand = Utf8("Brand#23"), 
part.p_container = Utf8("MED BOX")]
+09)--------------Filter: part.p_brand = Utf8View("Brand#23") AND 
part.p_container = Utf8View("MED BOX")
+10)----------------TableScan: part projection=[p_partkey, p_brand, 
p_container], partial_filters=[part.p_brand = Utf8View("Brand#23"), 
part.p_container = Utf8View("MED BOX")]
 11)--------SubqueryAlias: __scalar_sq_1
 12)----------Projection: CAST(Float64(0.2) * CAST(avg(lineitem.l_quantity) AS 
Float64) AS Decimal128(30, 15)), lineitem.l_partkey
 13)------------Aggregate: groupBy=[[lineitem.l_partkey]], 
aggr=[[avg(lineitem.l_quantity)]]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part
index b0e5b2e904..3b15fb3d8e 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part
@@ -57,19 +57,19 @@ logical_plan
 01)Projection: sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) 
AS revenue
 02)--Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice * 
(Decimal128(Some(1),20,0) - lineitem.l_discount)) AS 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
 03)----Projection: lineitem.l_extendedprice, lineitem.l_discount
-04)------Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand 
= Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), 
Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= 
Decimal128(Some(100),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = 
Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), 
Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quantity >= Decimal [...]
+04)------Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand 
= Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), 
Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND 
lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = 
Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), 
Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PAC [...]
 05)--------Projection: lineitem.l_partkey, lineitem.l_quantity, 
lineitem.l_extendedprice, lineitem.l_discount
-06)----------Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND 
lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8("AIR") OR 
lineitem.l_shipmode = Utf8("AIR REG")) AND lineitem.l_shipinstruct = 
Utf8("DELIVER IN PERSON")
-07)------------TableScan: lineitem projection=[l_partkey, l_quantity, 
l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
partial_filters=[lineitem.l_shipmode = Utf8("AIR") OR lineitem.l_shipmode = 
Utf8("AIR REG"), lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON"), 
lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(2000),15,2) OR l [...]
-08)--------Filter: (part.p_brand = Utf8("Brand#12") AND part.p_container IN 
([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND 
part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container 
IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND 
part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND 
part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), 
Utf8("LG PKG")]) AND part.p_size <= Int32(15)) A [...]
-09)----------TableScan: part projection=[p_partkey, p_brand, p_size, 
p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = 
Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), 
Utf8("SM PACK"), Utf8("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = 
Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), 
Utf8("MED PKG"), Utf8("MED PACK")]) AND part.p_size <= Int32(10) OR 
part.p_brand = Utf8("Brand#34") AND part.p_containe [...]
+06)----------Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND 
lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR 
lineitem.l_shipmode = Utf8View("AIR REG")) AND lineitem.l_shipinstruct = 
Utf8View("DELIVER IN PERSON")
+07)------------TableScan: lineitem projection=[l_partkey, l_quantity, 
l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
partial_filters=[lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = 
Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON"), 
lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= 
Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= 
Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000 
[...]
+08)--------Filter: (part.p_brand = Utf8View("Brand#12") AND part.p_container 
IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM 
PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND 
part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED 
PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = 
Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), 
Utf8View("LG BOX"), Utf8View("LG  [...]
+09)----------TableScan: part projection=[p_partkey, p_brand, p_size, 
p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = 
Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), 
Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size 
<= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN 
([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED 
PACK")]) AND part.p_size <= Int32(10) OR part.p_bran [...]
 physical_plan
 01)ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)@0 as revenue]
 02)--AggregateExec: mode=Final, gby=[], aggr=[sum(lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount)]
 03)----CoalescePartitionsExec
 04)------AggregateExec: mode=Partial, gby=[], 
aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
 05)--------CoalesceBatchesExec: target_batch_size=8192
-06)----------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND Use 
p_container@3 IN (SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: 
Utf8("SM BOX") }, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM 
PKG") }]) AND l_quantity@0 >= Some(100),15,2 AND l_quantity@0 <= 
Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND Use p_container@3 
IN (SET) ([Literal { value: Utf8("MED BAG") }, Literal { v [...]
+06)----------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3 
IN ([Literal { value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM 
BOX") }, Literal { value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM 
PKG") }]) AND l_quantity@0 >= Some(100),15,2 AND l_quantity@0 <= 
Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND p_container@3 IN 
([Literal { value: Utf8View("MED BAG") }, Literal { v [...]
 07)------------CoalesceBatchesExec: target_batch_size=8192
 08)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), 
input_partitions=4
 09)----------------CoalesceBatchesExec: target_batch_size=8192
@@ -78,6 +78,6 @@ physical_plan
 12)------------CoalesceBatchesExec: target_batch_size=8192
 13)--------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), 
input_partitions=4
 14)----------------CoalesceBatchesExec: target_batch_size=8192
-15)------------------FilterExec: (p_brand@1 = Brand#12 AND Use p_container@3 
IN (SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") 
}, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND 
p_size@2 <= 5 OR p_brand@1 = Brand#23 AND Use p_container@3 IN (SET) ([Literal 
{ value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { 
value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 
10 OR p_brand@1 = Brand# [...]
+15)------------------FilterExec: (p_brand@1 = Brand#12 AND p_container@3 IN 
([Literal { value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") 
}, Literal { value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") 
}]) AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND p_container@3 IN ([Literal { 
value: Utf8View("MED BAG") }, Literal { value: Utf8View("MED BOX") }, Literal { 
value: Utf8View("MED PKG") }, Literal { value: Utf8View("MED PACK") }]) AND 
p_size@2 <= 10 OR p_bran [...]
 16)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 17)----------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, 
projection=[p_partkey, p_brand, p_size, p_container], file_type=csv, 
has_header=false
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part
index 2a8ee9f229..b2e0fb0cd1 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part
@@ -75,14 +75,14 @@ logical_plan
 10)------------------Projection: part.p_partkey, part.p_mfgr, 
partsupp.ps_suppkey, partsupp.ps_supplycost
 11)--------------------Inner Join: part.p_partkey = partsupp.ps_partkey
 12)----------------------Projection: part.p_partkey, part.p_mfgr
-13)------------------------Filter: part.p_size = Int32(15) AND part.p_type 
LIKE Utf8("%BRASS")
-14)--------------------------TableScan: part projection=[p_partkey, p_mfgr, 
p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE 
Utf8("%BRASS")]
+13)------------------------Filter: part.p_size = Int32(15) AND part.p_type 
LIKE Utf8View("%BRASS")
+14)--------------------------TableScan: part projection=[p_partkey, p_mfgr, 
p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE 
Utf8View("%BRASS")]
 15)----------------------TableScan: partsupp projection=[ps_partkey, 
ps_suppkey, ps_supplycost]
 16)------------------TableScan: supplier projection=[s_suppkey, s_name, 
s_address, s_nationkey, s_phone, s_acctbal, s_comment]
 17)--------------TableScan: nation projection=[n_nationkey, n_name, 
n_regionkey]
 18)----------Projection: region.r_regionkey
-19)------------Filter: region.r_name = Utf8("EUROPE")
-20)--------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("EUROPE")]
+19)------------Filter: region.r_name = Utf8View("EUROPE")
+20)--------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("EUROPE")]
 21)------SubqueryAlias: __scalar_sq_1
 22)--------Projection: min(partsupp.ps_supplycost), partsupp.ps_partkey
 23)----------Aggregate: groupBy=[[partsupp.ps_partkey]], 
aggr=[[min(partsupp.ps_supplycost)]]
@@ -96,8 +96,8 @@ logical_plan
 31)------------------------TableScan: supplier projection=[s_suppkey, 
s_nationkey]
 32)--------------------TableScan: nation projection=[n_nationkey, n_regionkey]
 33)----------------Projection: region.r_regionkey
-34)------------------Filter: region.r_name = Utf8("EUROPE")
-35)--------------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("EUROPE")]
+34)------------------Filter: region.r_name = Utf8View("EUROPE")
+35)--------------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("EUROPE")]
 physical_plan
 01)SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, 
s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=10
 02)--SortExec: TopK(fetch=10), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS 
LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], 
preserve_partitioning=[true]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part
index 4844d5fae6..0b994de411 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part
@@ -63,8 +63,8 @@ logical_plan
 05)--------Inner Join: supplier.s_nationkey = nation.n_nationkey
 06)----------TableScan: supplier projection=[s_suppkey, s_name, s_address, 
s_nationkey]
 07)----------Projection: nation.n_nationkey
-08)------------Filter: nation.n_name = Utf8("CANADA")
-09)--------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("CANADA")]
+08)------------Filter: nation.n_name = Utf8View("CANADA")
+09)--------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("CANADA")]
 10)------SubqueryAlias: __correlated_sq_2
 11)--------Projection: partsupp.ps_suppkey
 12)----------Inner Join: partsupp.ps_partkey = __scalar_sq_3.l_partkey, 
partsupp.ps_suppkey = __scalar_sq_3.l_suppkey Filter: CAST(partsupp.ps_availqty 
AS Float64) > __scalar_sq_3.Float64(0.5) * sum(lineitem.l_quantity)
@@ -72,8 +72,8 @@ logical_plan
 14)--------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_availqty]
 15)--------------SubqueryAlias: __correlated_sq_1
 16)----------------Projection: part.p_partkey
-17)------------------Filter: part.p_name LIKE Utf8("forest%")
-18)--------------------TableScan: part projection=[p_partkey, p_name], 
partial_filters=[part.p_name LIKE Utf8("forest%")]
+17)------------------Filter: part.p_name LIKE Utf8View("forest%")
+18)--------------------TableScan: part projection=[p_partkey, p_name], 
partial_filters=[part.p_name LIKE Utf8View("forest%")]
 19)------------SubqueryAlias: __scalar_sq_3
 20)--------------Projection: Float64(0.5) * CAST(sum(lineitem.l_quantity) AS 
Float64), lineitem.l_partkey, lineitem.l_suppkey
 21)----------------Aggregate: groupBy=[[lineitem.l_partkey, 
lineitem.l_suppkey]], aggr=[[sum(lineitem.l_quantity)]]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
index bb3e884e27..e521715240 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
@@ -76,11 +76,11 @@ logical_plan
 16)----------------------------Filter: lineitem.l_receiptdate > 
lineitem.l_commitdate
 17)------------------------------TableScan: lineitem projection=[l_orderkey, 
l_suppkey, l_commitdate, l_receiptdate], 
partial_filters=[lineitem.l_receiptdate > lineitem.l_commitdate]
 18)--------------------Projection: orders.o_orderkey
-19)----------------------Filter: orders.o_orderstatus = Utf8("F")
-20)------------------------TableScan: orders projection=[o_orderkey, 
o_orderstatus], partial_filters=[orders.o_orderstatus = Utf8("F")]
+19)----------------------Filter: orders.o_orderstatus = Utf8View("F")
+20)------------------------TableScan: orders projection=[o_orderkey, 
o_orderstatus], partial_filters=[orders.o_orderstatus = Utf8View("F")]
 21)----------------Projection: nation.n_nationkey
-22)------------------Filter: nation.n_name = Utf8("SAUDI ARABIA")
-23)--------------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("SAUDI ARABIA")]
+22)------------------Filter: nation.n_name = Utf8View("SAUDI ARABIA")
+23)--------------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("SAUDI ARABIA")]
 24)------------SubqueryAlias: __correlated_sq_1
 25)--------------SubqueryAlias: l2
 26)----------------TableScan: lineitem projection=[l_orderkey, l_suppkey]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part
index 2ad496ef26..d982ec32e9 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part
@@ -50,8 +50,8 @@ logical_plan
 06)----------Projection: orders.o_orderkey, orders.o_orderdate, 
orders.o_shippriority
 07)------------Inner Join: customer.c_custkey = orders.o_custkey
 08)--------------Projection: customer.c_custkey
-09)----------------Filter: customer.c_mktsegment = Utf8("BUILDING")
-10)------------------TableScan: customer projection=[c_custkey, c_mktsegment], 
partial_filters=[customer.c_mktsegment = Utf8("BUILDING")]
+09)----------------Filter: customer.c_mktsegment = Utf8View("BUILDING")
+10)------------------TableScan: customer projection=[c_custkey, c_mktsegment], 
partial_filters=[customer.c_mktsegment = Utf8View("BUILDING")]
 11)--------------Filter: orders.o_orderdate < Date32("1995-03-15")
 12)----------------TableScan: orders projection=[o_orderkey, o_custkey, 
o_orderdate, o_shippriority], partial_filters=[orders.o_orderdate < 
Date32("1995-03-15")]
 13)----------Projection: lineitem.l_orderkey, lineitem.l_extendedprice, 
lineitem.l_discount
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part
index f192f987b3..15636056b8 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part
@@ -64,8 +64,8 @@ logical_plan
 19)------------------TableScan: supplier projection=[s_suppkey, s_nationkey]
 20)--------------TableScan: nation projection=[n_nationkey, n_name, 
n_regionkey]
 21)----------Projection: region.r_regionkey
-22)------------Filter: region.r_name = Utf8("ASIA")
-23)--------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("ASIA")]
+22)------------Filter: region.r_name = Utf8View("ASIA")
+23)--------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("ASIA")]
 physical_plan
 01)SortPreservingMergeExec: [revenue@1 DESC]
 02)--SortExec: expr=[revenue@1 DESC], preserve_partitioning=[true]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part
index e03de9596f..291d56e43f 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part
@@ -63,7 +63,7 @@ logical_plan
 03)----Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, 
shipping.l_year]], aggr=[[sum(shipping.volume)]]
 04)------SubqueryAlias: shipping
 05)--------Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, 
date_part(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, 
lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS 
volume
-06)----------Inner Join: customer.c_nationkey = n2.n_nationkey Filter: 
n1.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = 
Utf8("GERMANY") AND n2.n_name = Utf8("FRANCE")
+06)----------Inner Join: customer.c_nationkey = n2.n_nationkey Filter: 
n1.n_name = Utf8View("FRANCE") AND n2.n_name = Utf8View("GERMANY") OR n1.n_name 
= Utf8View("GERMANY") AND n2.n_name = Utf8View("FRANCE")
 07)------------Projection: lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_shipdate, customer.c_nationkey, n1.n_name
 08)--------------Inner Join: supplier.s_nationkey = n1.n_nationkey
 09)----------------Projection: supplier.s_nationkey, lineitem.l_extendedprice, 
lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey
@@ -78,11 +78,11 @@ logical_plan
 18)------------------------TableScan: orders projection=[o_orderkey, o_custkey]
 19)--------------------TableScan: customer projection=[c_custkey, c_nationkey]
 20)----------------SubqueryAlias: n1
-21)------------------Filter: nation.n_name = Utf8("FRANCE") OR nation.n_name = 
Utf8("GERMANY")
-22)--------------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("FRANCE") OR nation.n_name = 
Utf8("GERMANY")]
+21)------------------Filter: nation.n_name = Utf8View("FRANCE") OR 
nation.n_name = Utf8View("GERMANY")
+22)--------------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("FRANCE") OR nation.n_name = 
Utf8View("GERMANY")]
 23)------------SubqueryAlias: n2
-24)--------------Filter: nation.n_name = Utf8("GERMANY") OR nation.n_name = 
Utf8("FRANCE")
-25)----------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("GERMANY") OR nation.n_name = 
Utf8("FRANCE")]
+24)--------------Filter: nation.n_name = Utf8View("GERMANY") OR nation.n_name 
= Utf8View("FRANCE")
+25)----------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("GERMANY") OR nation.n_name = 
Utf8View("FRANCE")]
 physical_plan
 01)SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC 
NULLS LAST, l_year@2 ASC NULLS LAST]
 02)--SortExec: expr=[supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC NULLS 
LAST, l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part
index 88ceffd62a..50171c528d 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part
@@ -58,7 +58,7 @@ order by
 logical_plan
 01)Sort: all_nations.o_year ASC NULLS LAST
 02)--Projection: all_nations.o_year, CAST(CAST(sum(CASE WHEN 
all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) 
AS Decimal128(12, 2)) / CAST(sum(all_nations.volume) AS Decimal128(12, 2)) AS 
Decimal128(15, 2)) AS mkt_share
-03)----Aggregate: groupBy=[[all_nations.o_year]], aggr=[[sum(CASE WHEN 
all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE 
Decimal128(Some(0),38,4) END) AS sum(CASE WHEN all_nations.nation = 
Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), 
sum(all_nations.volume)]]
+03)----Aggregate: groupBy=[[all_nations.o_year]], aggr=[[sum(CASE WHEN 
all_nations.nation = Utf8View("BRAZIL") THEN all_nations.volume ELSE 
Decimal128(Some(0),38,4) END) AS sum(CASE WHEN all_nations.nation = 
Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), 
sum(all_nations.volume)]]
 04)------SubqueryAlias: all_nations
 05)--------Projection: date_part(Utf8("YEAR"), orders.o_orderdate) AS o_year, 
lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS 
volume, n2.n_name AS nation
 06)----------Inner Join: n1.n_regionkey = region.r_regionkey
@@ -75,8 +75,8 @@ logical_plan
 17)--------------------------------Projection: lineitem.l_orderkey, 
lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount
 18)----------------------------------Inner Join: part.p_partkey = 
lineitem.l_partkey
 19)------------------------------------Projection: part.p_partkey
-20)--------------------------------------Filter: part.p_type = Utf8("ECONOMY 
ANODIZED STEEL")
-21)----------------------------------------TableScan: part 
projection=[p_partkey, p_type], partial_filters=[part.p_type = Utf8("ECONOMY 
ANODIZED STEEL")]
+20)--------------------------------------Filter: part.p_type = 
Utf8View("ECONOMY ANODIZED STEEL")
+21)----------------------------------------TableScan: part 
projection=[p_partkey, p_type], partial_filters=[part.p_type = 
Utf8View("ECONOMY ANODIZED STEEL")]
 22)------------------------------------TableScan: lineitem 
projection=[l_orderkey, l_partkey, l_suppkey, l_extendedprice, l_discount]
 23)--------------------------------TableScan: supplier projection=[s_suppkey, 
s_nationkey]
 24)----------------------------Filter: orders.o_orderdate >= 
Date32("1995-01-01") AND orders.o_orderdate <= Date32("1996-12-31")
@@ -87,8 +87,8 @@ logical_plan
 29)----------------SubqueryAlias: n2
 30)------------------TableScan: nation projection=[n_nationkey, n_name]
 31)------------Projection: region.r_regionkey
-32)--------------Filter: region.r_name = Utf8("AMERICA")
-33)----------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("AMERICA")]
+32)--------------Filter: region.r_name = Utf8View("AMERICA")
+33)----------------TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("AMERICA")]
 physical_plan
 01)SortPreservingMergeExec: [o_year@0 ASC NULLS LAST]
 02)--SortExec: expr=[o_year@0 ASC NULLS LAST], preserve_partitioning=[true]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part
index 8ccf967187..3b31c1bc2e 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part
@@ -67,8 +67,8 @@ logical_plan
 13)------------------------Projection: lineitem.l_orderkey, 
lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, 
lineitem.l_extendedprice, lineitem.l_discount
 14)--------------------------Inner Join: part.p_partkey = lineitem.l_partkey
 15)----------------------------Projection: part.p_partkey
-16)------------------------------Filter: part.p_name LIKE Utf8("%green%")
-17)--------------------------------TableScan: part projection=[p_partkey, 
p_name], partial_filters=[part.p_name LIKE Utf8("%green%")]
+16)------------------------------Filter: part.p_name LIKE Utf8View("%green%")
+17)--------------------------------TableScan: part projection=[p_partkey, 
p_name], partial_filters=[part.p_name LIKE Utf8View("%green%")]
 18)----------------------------TableScan: lineitem projection=[l_orderkey, 
l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount]
 19)------------------------TableScan: supplier projection=[s_suppkey, 
s_nationkey]
 20)--------------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_supplycost]
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index fb2166150b..d549f555f9 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -492,8 +492,8 @@ logical_plan
 07)------------Projection:
 08)--------------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
 09)----------------Projection: aggregate_test_100.c1
-10)------------------Filter: aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
-11)--------------------TableScan: aggregate_test_100 projection=[c1, c13], 
partial_filters=[aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
+10)------------------Filter: aggregate_test_100.c13 != 
Utf8View("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
+11)--------------------TableScan: aggregate_test_100 projection=[c1, c13], 
partial_filters=[aggregate_test_100.c13 != 
Utf8View("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
 12)----Projection: Int64(1) AS cnt
 13)------Limit: skip=0, fetch=3
 14)--------EmptyRelation
@@ -829,10 +829,10 @@ ORDER BY c1
 logical_plan
 01)Sort: c1 ASC NULLS LAST
 02)--Union
-03)----Filter: aggregate_test_100.c1 = Utf8("a")
-04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")]
-05)----Filter: aggregate_test_100.c1 = Utf8("a")
-06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")]
+03)----Filter: aggregate_test_100.c1 = Utf8View("a")
+04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = 
Utf8View("a")]
+05)----Filter: aggregate_test_100.c1 = Utf8View("a")
+06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = 
Utf8View("a")]
 physical_plan
 01)CoalescePartitionsExec
 02)--UnionExec
diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt 
b/datafusion/sqllogictest/test_files/union_by_name.slt
index 9572e6efc3..233885618f 100644
--- a/datafusion/sqllogictest/test_files/union_by_name.slt
+++ b/datafusion/sqllogictest/test_files/union_by_name.slt
@@ -348,7 +348,7 @@ Schema {
     fields: [
         Field {
             name: "x",
-            data_type: Utf8,
+            data_type: Utf8View,
             nullable: true,
             dict_id: 0,
             dict_is_ordered: false,
@@ -356,7 +356,7 @@ Schema {
         },
         Field {
             name: "y",
-            data_type: Utf8,
+            data_type: Utf8View,
             nullable: true,
             dict_id: 0,
             dict_is_ordered: false,
@@ -364,7 +364,7 @@ Schema {
         },
         Field {
             name: "z",
-            data_type: Utf8,
+            data_type: Utf8View,
             nullable: true,
             dict_id: 0,
             dict_is_ordered: false,
@@ -387,7 +387,7 @@ Schema {
     fields: [
         Field {
             name: "x",
-            data_type: Utf8,
+            data_type: Utf8View,
             nullable: true,
             dict_id: 0,
             dict_is_ordered: false,
@@ -395,7 +395,7 @@ Schema {
         },
         Field {
             name: "y",
-            data_type: Utf8,
+            data_type: Utf8View,
             nullable: true,
             dict_id: 0,
             dict_is_ordered: false,
@@ -403,7 +403,7 @@ Schema {
         },
         Field {
             name: "z",
-            data_type: Utf8,
+            data_type: Utf8View,
             nullable: true,
             dict_id: 0,
             dict_is_ordered: false,
diff --git a/datafusion/sqllogictest/test_files/update.slt 
b/datafusion/sqllogictest/test_files/update.slt
index 908d2b34ae..9f2c16b211 100644
--- a/datafusion/sqllogictest/test_files/update.slt
+++ b/datafusion/sqllogictest/test_files/update.slt
@@ -31,7 +31,7 @@ explain update t1 set a=1, b=2, c=3.0, d=NULL;
 ----
 logical_plan
 01)Dml: op=[Update] table=[t1]
-02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b, 
Float64(3) AS c, CAST(NULL AS Int32) AS d
+02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8View) AS 
b, Float64(3) AS c, CAST(NULL AS Int32) AS d
 03)----TableScan: t1
 physical_plan_error This feature is not implemented: Unsupported logical plan: 
Dml(Update)
 
@@ -40,7 +40,7 @@ explain update t1 set a=c+1, b=a, c=c+1.0, d=b;
 ----
 logical_plan
 01)Dml: op=[Update] table=[t1]
-02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, 
CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
+02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, 
CAST(t1.a AS Utf8View) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
 03)----TableScan: t1
 physical_plan_error This feature is not implemented: Unsupported logical plan: 
Dml(Update)
 
@@ -69,7 +69,7 @@ explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where 
t1.a = t2.a and t1
 logical_plan
 01)Dml: op=[Update] table=[t1]
 02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, 
CAST(Int64(1) AS Int32) AS d
-03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
+03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > 
Float64(1)
 04)------Cross Join: 
 05)--------TableScan: t1
 06)--------TableScan: t2
@@ -89,7 +89,7 @@ explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 
where t.a = t2.a and
 logical_plan
 01)Dml: op=[Update] table=[t1]
 02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) 
AS Int32) AS d
-03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1)
+03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > 
Float64(1)
 04)------Cross Join: 
 05)--------SubqueryAlias: t
 06)----------TableScan: t1
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index c68385c49b..c86921012f 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1770,8 +1770,8 @@ logical_plan
 04)------Projection:
 05)--------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
 06)----------Projection: aggregate_test_100.c1
-07)------------Filter: aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
-08)--------------TableScan: aggregate_test_100 projection=[c1, c13], 
partial_filters=[aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
+07)------------Filter: aggregate_test_100.c13 != 
Utf8View("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
+08)--------------TableScan: aggregate_test_100 projection=[c1, c13], 
partial_filters=[aggregate_test_100.c13 != 
Utf8View("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
 physical_plan
 01)ProjectionExec: expr=[count(Int64(1))@0 as global_count]
 02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 4129ddc392..a794241dfc 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -130,7 +130,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.sql_parser.enable_options_value_normalization                | 
false                     | When set to true, SQL parser will normalize options 
value (convert value to lowercase). Note that this option is ignored and will 
be removed in the future. All case-insensitive values are normalized 
automatically.                                                                  
                                                                                
                              [...]
 | datafusion.sql_parser.dialect                                           | 
generic                   | Configure the SQL dialect used by DataFusion's 
parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, 
Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.  
                                                                                
                                                                                
                          [...]
 | datafusion.sql_parser.support_varchar_with_length                       | 
true                      | If true, permit lengths for `VARCHAR` such as 
`VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a 
length is specified. The Arrow type system does not have a notion of maximum 
string length and thus DataFusion can not enforce such limits.                  
                                                                                
                              [...]
-| datafusion.sql_parser.map_varchar_to_utf8view                           | 
false                     | If true, `VARCHAR` is mapped to `Utf8View` during 
SQL planning. If false, `VARCHAR` is mapped to `Utf8` during SQL planning. 
Default is false.                                                               
                                                                                
                                                                                
                        [...]
+| datafusion.sql_parser.map_varchar_to_utf8view                           | 
true                      | If true, `VARCHAR` is mapped to `Utf8View` during 
SQL planning. If false, `VARCHAR` is mapped to `Utf8` during SQL planning. 
Default is false.                                                               
                                                                                
                                                                                
                        [...]
 | datafusion.sql_parser.collect_spans                                     | 
false                     | When set to true, the source locations relative to 
the original SQL query (i.e. 
[`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html))
 will be collected and recorded in the logical plan nodes.                      
                                                                                
                                                                     [...]
 | datafusion.sql_parser.recursion_limit                                   | 50 
                       | Specifies the recursion depth limit when parsing 
complex SQL Queries                                                             
                                                                                
                                                                                
                                                                                
                    [...]
 | datafusion.format.safe                                                  | 
true                      | If set to `true` any formatting errors will be 
written to the output instead of being converted into a [`std::fmt::Error`]     
                                                                                
                                                                                
                                                                                
                      [...]
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd76..107b36603e 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to