This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 463ec30743 feat: Remove compact row since it's no longer used (#6021)
463ec30743 is described below

commit 463ec3074318c5bf31a6a1377ab5f01159d8a2ce
Author: Yijie Shen <[email protected]>
AuthorDate: Mon Apr 17 21:08:35 2023 +0800

    feat: Remove compact row since it's no longer used (#6021)
    
    * Remove compact row since it's no longer used
    
    * rm
    
    * fmt
    
    * clippy
---
 datafusion/core/benches/jit.rs                     |  28 +--
 .../core/src/physical_plan/aggregates/row_hash.rs  |  20 +-
 datafusion/core/tests/row.rs                       |  29 +--
 datafusion/row/src/accessor.rs                     |   6 +-
 datafusion/row/src/jit/mod.rs                      | 203 ++---------------
 datafusion/row/src/jit/reader.rs                   |  14 +-
 datafusion/row/src/jit/writer.rs                   |  15 +-
 datafusion/row/src/layout.rs                       | 239 +++++++--------------
 datafusion/row/src/lib.rs                          | 193 +++--------------
 datafusion/row/src/reader.rs                       |  63 +-----
 datafusion/row/src/writer.rs                       | 107 +--------
 11 files changed, 153 insertions(+), 764 deletions(-)

diff --git a/datafusion/core/benches/jit.rs b/datafusion/core/benches/jit.rs
index 0c6de319d2..d42df8e033 100644
--- a/datafusion/core/benches/jit.rs
+++ b/datafusion/core/benches/jit.rs
@@ -25,7 +25,6 @@ use crate::criterion::Criterion;
 use crate::data_utils::{create_record_batches, create_schema};
 use datafusion::row::jit::writer::bench_write_batch_jit;
 use datafusion::row::writer::bench_write_batch;
-use datafusion::row::RowType;
 use std::sync::Arc;
 
 fn criterion_benchmark(c: &mut Criterion) {
@@ -37,38 +36,15 @@ fn criterion_benchmark(c: &mut Criterion) {
     let batches =
         create_record_batches(schema.clone(), array_len, partitions_len, 
batch_size);
 
-    c.bench_function("compact row serializer", |b| {
-        b.iter(|| {
-            criterion::black_box(
-                bench_write_batch(&batches, schema.clone(), 
RowType::Compact).unwrap(),
-            )
-        })
-    });
-
     c.bench_function("word aligned row serializer", |b| {
         b.iter(|| {
-            criterion::black_box(
-                bench_write_batch(&batches, schema.clone(), 
RowType::WordAligned)
-                    .unwrap(),
-            )
-        })
-    });
-
-    c.bench_function("compact row serializer jit", |b| {
-        b.iter(|| {
-            criterion::black_box(
-                bench_write_batch_jit(&batches, schema.clone(), 
RowType::Compact)
-                    .unwrap(),
-            )
+            criterion::black_box(bench_write_batch(&batches, 
schema.clone()).unwrap())
         })
     });
 
     c.bench_function("word aligned row serializer jit", |b| {
         b.iter(|| {
-            criterion::black_box(
-                bench_write_batch_jit(&batches, schema.clone(), 
RowType::WordAligned)
-                    .unwrap(),
-            )
+            criterion::black_box(bench_write_batch_jit(&batches, 
schema.clone()).unwrap())
         })
     });
 }
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs 
b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 9a75da02fb..d9e42e478d 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -51,13 +51,13 @@ use datafusion_expr::Accumulator;
 use datafusion_row::accessor::RowAccessor;
 use datafusion_row::layout::RowLayout;
 use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::{MutableRecordBatch, RowType};
+use datafusion_row::MutableRecordBatch;
 use hashbrown::raw::RawTable;
 
 /// Grouping aggregate with row-format aggregation states inside.
 ///
 /// For each aggregation entry, we use:
-/// - [Compact] row represents grouping keys for fast hash computation and 
comparison directly on raw bytes.
+/// - [Arrow-row] represents grouping keys for fast hash computation and 
comparison directly on raw bytes.
 /// - [WordAligned] row to store aggregation state, designed to be 
CPU-friendly when updates over every field are often.
 ///
 /// The architecture is the following:
@@ -68,8 +68,8 @@ use hashbrown::raw::RawTable;
 /// 4. The state's RecordBatch is `merge`d to a new state
 /// 5. The state is mapped to the final value
 ///
-/// [Compact]: datafusion_row::layout::RowType::Compact
-/// [WordAligned]: datafusion_row::layout::RowType::WordAligned
+/// [Arrow-row]: OwnedRow
+/// [WordAligned]: datafusion_row::layout
 pub(crate) struct GroupedHashAggregateStream {
     schema: SchemaRef,
     input: SendableRecordBatchStream,
@@ -203,8 +203,7 @@ impl GroupedHashAggregateStream {
                 .collect(),
         )?;
 
-        let row_aggr_layout =
-            Arc::new(RowLayout::new(&row_aggr_schema, RowType::WordAligned));
+        let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema));
 
         let name = format!("GroupedHashAggregateStream[{partition}]");
         let aggr_state = AggregationState {
@@ -632,15 +631,14 @@ impl GroupedHashAggregateStream {
         // Store row accumulator results (either final output or intermediate 
state):
         let row_columns = match self.mode {
             AggregateMode::Partial => {
-                read_as_batch(&state_buffers, &self.row_aggr_schema, 
RowType::WordAligned)
+                read_as_batch(&state_buffers, &self.row_aggr_schema)
             }
             AggregateMode::Final
             | AggregateMode::FinalPartitioned
             | AggregateMode::Single => {
                 let mut results = vec![];
                 for (idx, acc) in self.row_accumulators.iter().enumerate() {
-                    let mut state_accessor =
-                        RowAccessor::new(&self.row_aggr_schema, 
RowType::WordAligned);
+                    let mut state_accessor = 
RowAccessor::new(&self.row_aggr_schema);
                     let current = state_buffers
                         .iter_mut()
                         .map(|buffer| {
@@ -727,10 +725,10 @@ impl GroupedHashAggregateStream {
     }
 }
 
-fn read_as_batch(rows: &[Vec<u8>], schema: &Schema, row_type: RowType) -> 
Vec<ArrayRef> {
+fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
     let row_num = rows.len();
     let mut output = MutableRecordBatch::new(row_num, 
Arc::new(schema.clone()));
-    let mut row = RowReader::new(schema, row_type);
+    let mut row = RowReader::new(schema);
 
     for data in rows {
         row.point_to(0, data);
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 5eeb237e18..55310bb611 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -23,7 +23,6 @@ use datafusion::execution::context::SessionState;
 use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::SessionContext;
-use datafusion_row::layout::RowType::{Compact, WordAligned};
 use datafusion_row::reader::read_as_batch;
 use datafusion_row::writer::write_batch_unchecked;
 use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
@@ -31,29 +30,6 @@ use std::sync::Arc;
 
 #[tokio::test]
 async fn test_with_parquet() -> Result<()> {
-    let ctx = SessionContext::new();
-    let state = ctx.state();
-    let task_ctx = state.task_ctx();
-    let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
-    let exec =
-        get_exec(&state, "alltypes_plain.parquet", projection.as_ref(), 
None).await?;
-    let schema = exec.schema().clone();
-
-    let batches = collect(exec, task_ctx).await?;
-    assert_eq!(1, batches.len());
-    let batch = &batches[0];
-
-    let mut vector = vec![0; 20480];
-    let row_offsets =
-        { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), 
Compact) };
-    let output_batch = { read_as_batch(&vector, schema, &row_offsets, 
Compact)? };
-    assert_eq!(*batch, output_batch);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_with_parquet_word_aligned() -> Result<()> {
     let ctx = SessionContext::new();
     let state = ctx.state();
     let task_ctx = state.task_ctx();
@@ -67,9 +43,8 @@ async fn test_with_parquet_word_aligned() -> Result<()> {
     let batch = &batches[0];
 
     let mut vector = vec![0; 20480];
-    let row_offsets =
-        { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), 
WordAligned) };
-    let output_batch = { read_as_batch(&vector, schema, &row_offsets, 
WordAligned)? };
+    let row_offsets = { write_batch_unchecked(&mut vector, 0, batch, 0, 
schema.clone()) };
+    let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
     assert_eq!(*batch, output_batch);
 
     Ok(())
diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs
index e7b4ed8501..bba44f0e56 100644
--- a/datafusion/row/src/accessor.rs
+++ b/datafusion/row/src/accessor.rs
@@ -17,7 +17,7 @@
 
 //! [`RowAccessor`] provides a Read/Write/Modify access for row with all 
fixed-sized fields:
 
-use crate::layout::{RowLayout, RowType};
+use crate::layout::RowLayout;
 use crate::validity::NullBitsFormatter;
 use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx};
 use arrow::datatypes::{DataType, Schema};
@@ -116,9 +116,9 @@ macro_rules! fn_get_idx_scalar {
 
 impl<'a> RowAccessor<'a> {
     /// new
-    pub fn new(schema: &Schema, row_type: RowType) -> Self {
+    pub fn new(schema: &Schema) -> Self {
         Self {
-            layout: Arc::new(RowLayout::new(schema, row_type)),
+            layout: Arc::new(RowLayout::new(schema)),
             data: &mut [],
             base_offset: 0,
         }
diff --git a/datafusion/row/src/jit/mod.rs b/datafusion/row/src/jit/mod.rs
index 03b8eed186..803c96b176 100644
--- a/datafusion/row/src/jit/mod.rs
+++ b/datafusion/row/src/jit/mod.rs
@@ -45,7 +45,6 @@ fn fn_name<T>(f: T) -> &'static str {
 mod tests {
     use crate::jit::reader::read_as_batch_jit;
     use crate::jit::writer::write_batch_unchecked_jit;
-    use crate::layout::RowType::{Compact, WordAligned};
     use arrow::record_batch::RecordBatch;
     use arrow::{array::*, datatypes::*};
     use datafusion_common::Result;
@@ -54,26 +53,26 @@ mod tests {
     use DataType::*;
 
     macro_rules! fn_test_single_type {
-        ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
+        ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
             paste::item! {
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
+                fn [<test_single_ $TYPE _jit>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", 
$TYPE, true)]));
                     let a = $ARRAY::from($VEC);
                     let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(a)])?;
                     let mut vector = vec![0; 1024];
                     let assembler = Assembler::default();
                     let row_offsets =
-                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, 
schema.clone(), &assembler, $ROWTYPE)? };
-                    let output_batch = { read_as_batch_jit(&vector, schema, 
&row_offsets, &assembler, $ROWTYPE)? };
+                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, 
schema.clone(), &assembler)? };
+                    let output_batch = { read_as_batch_jit(&vector, schema, 
&row_offsets, &assembler)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
 
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() -> 
Result<()> {
+                fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", 
$TYPE, false)]));
                     let v = $VEC.into_iter().filter(|o| 
o.is_some()).collect::<Vec<_>>();
                     let a = $ARRAY::from(v);
@@ -81,8 +80,8 @@ mod tests {
                     let mut vector = vec![0; 1024];
                     let assembler = Assembler::default();
                     let row_offsets =
-                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, 
schema.clone(), &assembler, $ROWTYPE)? };
-                    let output_batch = { read_as_batch_jit(&vector, schema, 
&row_offsets, &assembler, $ROWTYPE)? };
+                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, 
schema.clone(), &assembler)? };
+                    let output_batch = { read_as_batch_jit(&vector, schema, 
&row_offsets, &assembler)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
@@ -93,240 +92,78 @@ mod tests {
     fn_test_single_type!(
         BooleanArray,
         Boolean,
-        vec![Some(true), Some(false), None, Some(true), None],
-        Compact
-    );
-
-    fn_test_single_type!(
-        BooleanArray,
-        Boolean,
-        vec![Some(true), Some(false), None, Some(true), None],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        Int8Array,
-        Int8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(true), Some(false), None, Some(true), None]
     );
 
     fn_test_single_type!(
         Int8Array,
         Int8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Int16Array,
         Int16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Int16Array,
-        Int16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Int32Array,
         Int32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Int32Array,
-        Int32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Int64Array,
         Int64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Int64Array,
-        Int64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        UInt8Array,
-        UInt8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt8Array,
         UInt8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt16Array,
         UInt16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        UInt16Array,
-        UInt16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        UInt32Array,
-        UInt32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt32Array,
         UInt32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        UInt64Array,
-        UInt64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt64Array,
         UInt64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Float32Array,
         Float32,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Float32Array,
-        Float32,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        WordAligned
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
     );
 
     fn_test_single_type!(
         Float64Array,
         Float64,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Float64Array,
-        Float64,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        WordAligned
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
     );
 
     fn_test_single_type!(
         Date32Array,
         Date32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Date32Array,
-        Date32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        Date64Array,
-        Date64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Date64Array,
         Date64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
-
-    fn_test_single_type!(
-        StringArray,
-        Utf8,
-        vec![Some("hello"), Some("world"), None, Some(""), Some("")],
-        Compact
-    );
-
-    #[test]
-    fn test_single_binary_jit() -> Result<()> {
-        let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, 
true)]));
-        let values: Vec<Option<&[u8]>> =
-            vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
-        let a = BinaryArray::from_opt_vec(values);
-        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
-        let mut vector = vec![0; 8192];
-        let assembler = Assembler::default();
-        let row_offsets = {
-            write_batch_unchecked_jit(
-                &mut vector,
-                0,
-                &batch,
-                0,
-                schema.clone(),
-                &assembler,
-                Compact,
-            )?
-        };
-        let output_batch =
-            { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, 
Compact)? };
-        assert_eq!(batch, output_batch);
-        Ok(())
-    }
-
-    #[test]
-    fn test_single_binary_jit_null_free() -> Result<()> {
-        let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, 
false)]));
-        let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"];
-        let a = BinaryArray::from_vec(values);
-        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
-        let mut vector = vec![0; 8192];
-        let assembler = Assembler::default();
-        let row_offsets = {
-            write_batch_unchecked_jit(
-                &mut vector,
-                0,
-                &batch,
-                0,
-                schema.clone(),
-                &assembler,
-                Compact,
-            )?
-        };
-        let output_batch =
-            { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, 
Compact)? };
-        assert_eq!(batch, output_batch);
-        Ok(())
-    }
 }
diff --git a/datafusion/row/src/jit/reader.rs b/datafusion/row/src/jit/reader.rs
index d670354245..5d91de7be3 100644
--- a/datafusion/row/src/jit/reader.rs
+++ b/datafusion/row/src/jit/reader.rs
@@ -18,7 +18,6 @@
 //! Accessing row from raw bytes with JIT
 
 use crate::jit::fn_name;
-use crate::layout::RowType;
 use crate::reader::RowReader;
 use crate::reader::*;
 use crate::reg_fn;
@@ -39,11 +38,10 @@ pub fn read_as_batch_jit(
     schema: Arc<Schema>,
     offsets: &[usize],
     assembler: &Assembler,
-    row_type: RowType,
 ) -> Result<RecordBatch> {
     let row_num = offsets.len();
     let mut output = MutableRecordBatch::new(row_num, schema.clone());
-    let mut row = RowReader::new(&schema, row_type);
+    let mut row = RowReader::new(&schema);
     register_read_functions(assembler)?;
     let gen_func = gen_read_row(&schema, assembler)?;
     let mut jit = assembler.create_jit();
@@ -84,8 +82,6 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
     reg_fn!(asm, read_field_f64, reader_param.clone(), None);
     reg_fn!(asm, read_field_date32, reader_param.clone(), None);
     reg_fn!(asm, read_field_date64, reader_param.clone(), None);
-    reg_fn!(asm, read_field_utf8, reader_param.clone(), None);
-    reg_fn!(asm, read_field_binary, reader_param.clone(), None);
     reg_fn!(asm, read_field_bool_null_free, reader_param.clone(), None);
     reg_fn!(asm, read_field_u8_null_free, reader_param.clone(), None);
     reg_fn!(asm, read_field_u16_null_free, reader_param.clone(), None);
@@ -98,9 +94,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
     reg_fn!(asm, read_field_f32_null_free, reader_param.clone(), None);
     reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None);
     reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None);
-    reg_fn!(asm, read_field_date64_null_free, reader_param.clone(), None);
-    reg_fn!(asm, read_field_utf8_null_free, reader_param.clone(), None);
-    reg_fn!(asm, read_field_binary_null_free, reader_param, None);
+    reg_fn!(asm, read_field_date64_null_free, reader_param, None);
     Ok(())
 }
 
@@ -134,8 +128,6 @@ fn gen_read_row(schema: &Schema, assembler: &Assembler) -> 
Result<GeneratedFunct
                 Float64 => b.call_stmt("read_field_f64", params)?,
                 Date32 => b.call_stmt("read_field_date32", params)?,
                 Date64 => b.call_stmt("read_field_date64", params)?,
-                Utf8 => b.call_stmt("read_field_utf8", params)?,
-                Binary => b.call_stmt("read_field_binary", params)?,
                 _ => unimplemented!(),
             }
         } else {
@@ -153,8 +145,6 @@ fn gen_read_row(schema: &Schema, assembler: &Assembler) -> 
Result<GeneratedFunct
                 Float64 => b.call_stmt("read_field_f64_null_free", params)?,
                 Date32 => b.call_stmt("read_field_date32_null_free", params)?,
                 Date64 => b.call_stmt("read_field_date64_null_free", params)?,
-                Utf8 => b.call_stmt("read_field_utf8_null_free", params)?,
-                Binary => b.call_stmt("read_field_binary_null_free", params)?,
                 _ => unimplemented!(),
             }
         }
diff --git a/datafusion/row/src/jit/writer.rs b/datafusion/row/src/jit/writer.rs
index 38c3bd4114..cb96f34fab 100644
--- a/datafusion/row/src/jit/writer.rs
+++ b/datafusion/row/src/jit/writer.rs
@@ -18,7 +18,6 @@
 //! Reusable JIT version of row writer backed by Vec<u8> to stitch attributes 
together
 
 use crate::jit::fn_name;
-use crate::layout::RowType;
 use crate::reg_fn;
 use crate::schema_null_free;
 use crate::writer::RowWriter;
@@ -44,9 +43,8 @@ pub fn write_batch_unchecked_jit(
     row_idx: usize,
     schema: Arc<Schema>,
     assembler: &Assembler,
-    row_type: RowType,
 ) -> Result<Vec<usize>> {
-    let mut writer = RowWriter::new(&schema, row_type);
+    let mut writer = RowWriter::new(&schema);
     let mut current_offset = offset;
     let mut offsets = vec![];
     register_write_functions(assembler)?;
@@ -61,7 +59,6 @@ pub fn write_batch_unchecked_jit(
     for cur_row in row_idx..batch.num_rows() {
         offsets.push(current_offset);
         code_fn(&mut writer, cur_row, batch);
-        writer.end_padding();
         let row_width = writer.row_width;
         output[current_offset..current_offset + row_width]
             .copy_from_slice(writer.get_row());
@@ -76,10 +73,9 @@ pub fn write_batch_unchecked_jit(
 pub fn bench_write_batch_jit(
     batches: &[Vec<RecordBatch>],
     schema: Arc<Schema>,
-    row_type: RowType,
 ) -> Result<Vec<usize>> {
     let assembler = Assembler::default();
-    let mut writer = RowWriter::new(&schema, row_type);
+    let mut writer = RowWriter::new(&schema);
     let mut lengths = vec![];
     register_write_functions(&assembler)?;
     let gen_func = gen_write_row(&schema, &assembler)?;
@@ -92,7 +88,6 @@ pub fn bench_write_batch_jit(
     for batch in batches.iter().flatten() {
         for cur_row in 0..batch.num_rows() {
             code_fn(&mut writer, cur_row, batch);
-            writer.end_padding();
             lengths.push(writer.row_width);
             writer.reset()
         }
@@ -123,9 +118,7 @@ fn register_write_functions(asm: &Assembler) -> Result<()> {
     reg_fn!(asm, write_field_f32, reader_param.clone(), None);
     reg_fn!(asm, write_field_f64, reader_param.clone(), None);
     reg_fn!(asm, write_field_date32, reader_param.clone(), None);
-    reg_fn!(asm, write_field_date64, reader_param.clone(), None);
-    reg_fn!(asm, write_field_utf8, reader_param.clone(), None);
-    reg_fn!(asm, write_field_binary, reader_param, None);
+    reg_fn!(asm, write_field_date64, reader_param, None);
     Ok(())
 }
 
@@ -202,8 +195,6 @@ fn write_typed_field_stmt(
         Float64 => b.call_stmt("write_field_f64", params)?,
         Date32 => b.call_stmt("write_field_date32", params)?,
         Date64 => b.call_stmt("write_field_date64", params)?,
-        Utf8 => b.call_stmt("write_field_utf8", params)?,
-        Binary => b.call_stmt("write_field_binary", params)?,
         _ => unimplemented!(),
     }
     Ok(())
diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs
index 6a8e8a78ec..7147132753 100644
--- a/datafusion/row/src/layout.rs
+++ b/datafusion/row/src/layout.rs
@@ -21,64 +21,54 @@ use crate::schema_null_free;
 use arrow::datatypes::{DataType, Schema};
 use arrow::util::bit_util::{ceil, round_upto_power_of_2};
 
-const UTF8_DEFAULT_SIZE: usize = 20;
-const BINARY_DEFAULT_SIZE: usize = 100;
-
-#[derive(Copy, Clone, Debug)]
-/// Type of a RowLayout
-pub enum RowType {
-    /// Stores  each field with minimum bytes for space efficiency.
-    ///
-    /// Its typical use case represents a sorting payload that
-    /// accesses all row fields as a unit.
-    ///
-    /// Each tuple consists of up to three parts: "`null bit set`" ,
-    /// "`values`" and "`var length data`"
-    ///
-    /// The null bit set is used for null tracking and is aligned to 1-byte. 
It stores
-    /// one bit per field.
-    ///
-    /// In the region of the values, we store the fields in the order they are 
defined in the schema.
-    /// - For fixed-length, sequential access fields, we store them directly.
-    ///       E.g., 4 bytes for int and 1 byte for bool.
-    /// - For fixed-length, update often fields, we store one 8-byte word per 
field.
-    /// - For fields of non-primitive or variable-length types,
-    ///       we append their actual content to the end of the var length 
region and
-    ///       store their offset relative to row base and their length, packed 
into an 8-byte word.
-    ///
-    /// ```plaintext
-    /// ┌────────────────┬──────────────────────────┬───────────────────────┐  
      ┌───────────────────────┬────────────┐
-    /// │Validity Bitmask│    Fixed Width Field     │ Variable Width Field  │  
 ...  │     vardata area      │  padding   │
-    /// │ (byte aligned) │   (native type width)    │(vardata offset + len) │  
      │   (variable length)   │   bytes    │
-    /// └────────────────┴──────────────────────────┴───────────────────────┘  
      └───────────────────────┴────────────┘
-    /// ```
-    ///
-    ///  For example, given the schema (Int8, Utf8, Float32, Utf8)
-    ///
-    ///  Encoding the tuple (1, "FooBar", NULL, "baz")
-    ///
-    ///  Requires 32 bytes (31 bytes payload and 1 byte padding to make each 
tuple 8-bytes aligned):
-    ///
-    /// ```plaintext
-    /// 
┌──────────┬──────────┬──────────────────────┬──────────────┬──────────────────────┬───────────────────────┬──────────┐
-    /// │0b00001011│   0x01   │0x00000016  0x00000006│  0x00000000  
│0x0000001C  0x00000003│       FooBarbaz       │   0x00   │
-    /// 
└──────────┴──────────┴──────────────────────┴──────────────┴──────────────────────┴───────────────────────┴──────────┘
-    /// 0          1          2                     10              14         
            22                     31         32
-    /// ```
-    Compact,
-
-    /// This type of layout stores one 8-byte word per field for CPU-friendly,
-    /// It is mainly used to represent the rows with frequently updated 
content,
-    /// for example, grouping state for hash aggregation.
-    WordAligned,
-    // RawComparable,
-}
-
-/// Reveals how the fields of a record are stored in the raw-bytes format
+/// Row layout stores one or multiple 8-byte word(s) per field for CPU-friendly
+/// and efficient processing.
+///
+/// It is mainly used to represent the rows with frequently updated content,
+/// for example, grouping state for hash aggregation.
+///
+/// Each tuple consists of two parts: "`null bit set`" and "`values`".
+///
+/// For null-free tuples, the null bit set can be omitted.
+///
+/// The null bit set, when present, is aligned to 8 bytes. It stores one bit 
per field.
+///
+/// In the region of the values, we store the fields in the order they are 
defined in the schema.
+/// Each field is stored in one or multiple 8-byte words.
+///
+/// ```plaintext
+/// ┌─────────────────┬─────────────────────┐
+/// │Validity Bitmask │      Fields         │
+/// │ (8-byte aligned)│   (8-byte words)    │
+/// └─────────────────┴─────────────────────┘
+/// ```
+///
+///  For example, given the schema (Int8, Float32, Int64) with a null-free 
tuple
+///
+///  Encoding the tuple (1, 3.14, 42)
+///
+///  Requires 24 bytes (3 fields * 8 bytes each):
+///
+/// ```plaintext
+/// ┌──────────────────────┬──────────────────────┬──────────────────────┐
+/// │         0x01         │      0x4048F5C3      │      0x0000002A      │
+/// └──────────────────────┴──────────────────────┴──────────────────────┘
+/// 0                      8                      16                     24
+/// ```
+///
+///  If the schema allows null values and the tuple is (1, NULL, 42)
+///
+///  Encoding the tuple requires 32 bytes (1 * 8 bytes for the null bit set + 
3 fields * 8 bytes each):
+///
+/// ```plaintext
+/// 
┌──────────────────────────┬──────────────────────┬──────────────────────┬──────────────────────┐
+/// │       0b00000101         │         0x01         │      0x00000000      │ 
     0x0000002A      │
+/// │ (7 bytes padding after)  │                      │                      │ 
                     │
+/// 
└──────────────────────────┴──────────────────────┴──────────────────────┴──────────────────────┘
+/// 0                          8                      16                     
24                     32
+/// ```
 #[derive(Debug, Clone)]
 pub struct RowLayout {
-    /// Type of the layout
-    row_type: RowType,
     /// If a row is null free according to its schema
     pub(crate) null_free: bool,
     /// The number of bytes used to store null bits for each field.
@@ -93,27 +83,20 @@ pub struct RowLayout {
 
 impl RowLayout {
     /// new
-    pub fn new(schema: &Schema, row_type: RowType) -> Self {
+    pub fn new(schema: &Schema) -> Self {
         assert!(
-            row_supported(schema, row_type),
-            "{row_type:?}Row with {schema:?} not supported yet.",
+            row_supported(schema),
+            "Row with {schema:?} not supported yet.",
         );
         let null_free = schema_null_free(schema);
         let field_count = schema.fields().len();
         let null_width = if null_free {
             0
         } else {
-            match row_type {
-                RowType::Compact => ceil(field_count, 8),
-                RowType::WordAligned => 
round_upto_power_of_2(ceil(field_count, 8), 8),
-            }
-        };
-        let (field_offsets, values_width) = match row_type {
-            RowType::Compact => compact_offsets(null_width, schema),
-            RowType::WordAligned => word_aligned_offsets(null_width, schema),
+            round_upto_power_of_2(ceil(field_count, 8), 8)
         };
+        let (field_offsets, values_width) = word_aligned_offsets(null_width, 
schema);
         Self {
-            row_type,
             null_free,
             null_width,
             values_width,
@@ -129,36 +112,6 @@ impl RowLayout {
     }
 }
 
-/// Get relative offsets for each field and total width for values
-fn compact_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>, usize) {
-    let mut offsets = vec![];
-    let mut offset = null_width;
-    for f in schema.fields() {
-        offsets.push(offset);
-        offset += compact_type_width(f.data_type());
-    }
-    (offsets, offset - null_width)
-}
-
-fn var_length(dt: &DataType) -> bool {
-    use DataType::*;
-    matches!(dt, Utf8 | Binary)
-}
-
-fn compact_type_width(dt: &DataType) -> usize {
-    use DataType::*;
-    if var_length(dt) {
-        return std::mem::size_of::<u64>();
-    }
-    match dt {
-        Boolean | UInt8 | Int8 => 1,
-        UInt16 | Int16 => 2,
-        UInt32 | Int32 | Float32 | Date32 => 4,
-        UInt64 | Int64 | Float64 | Date64 => 8,
-        _ => unreachable!(),
-    }
-}
-
 fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>, 
usize) {
     let mut offsets = vec![];
     let mut offset = null_width;
@@ -175,76 +128,30 @@ fn word_aligned_offsets(null_width: usize, schema: 
&Schema) -> (Vec<usize>, usiz
     (offsets, offset - null_width)
 }
 
-/// Estimate row width based on schema
-pub(crate) fn estimate_row_width(schema: &Schema, layout: &RowLayout) -> usize 
{
-    let mut width = layout.fixed_part_width();
-    if matches!(layout.row_type, RowType::WordAligned) {
-        return width;
-    }
-    for f in schema.fields() {
-        match f.data_type() {
-            DataType::Utf8 => width += UTF8_DEFAULT_SIZE,
-            DataType::Binary => width += BINARY_DEFAULT_SIZE,
-            _ => {}
-        }
-    }
-    round_upto_power_of_2(width, 8)
-}
-
 /// Return true of data in `schema` can be converted to raw-bytes
 /// based rows.
 ///
 /// Note all schemas can be supported in the row format
-pub fn row_supported(schema: &Schema, row_type: RowType) -> bool {
-    schema
-        .fields()
-        .iter()
-        .all(|f| supported_type(f.data_type(), row_type))
-}
-
-fn supported_type(dt: &DataType, row_type: RowType) -> bool {
-    use DataType::*;
-
-    match row_type {
-        RowType::Compact => {
-            matches!(
-                dt,
-                Boolean
-                    | UInt8
-                    | UInt16
-                    | UInt32
-                    | UInt64
-                    | Int8
-                    | Int16
-                    | Int32
-                    | Int64
-                    | Float32
-                    | Float64
-                    | Date32
-                    | Date64
-                    | Utf8
-                    | Binary
-            )
-        }
-        // only fixed length types are supported for fast in-place update.
-        RowType::WordAligned => {
-            matches!(
-                dt,
-                Boolean
-                    | UInt8
-                    | UInt16
-                    | UInt32
-                    | UInt64
-                    | Int8
-                    | Int16
-                    | Int32
-                    | Int64
-                    | Float32
-                    | Float64
-                    | Date32
-                    | Date64
-                    | Decimal128(_, _)
-            )
-        }
-    }
+pub fn row_supported(schema: &Schema) -> bool {
+    schema.fields().iter().all(|f| {
+        let dt = f.data_type();
+        use DataType::*;
+        matches!(
+            dt,
+            Boolean
+                | UInt8
+                | UInt16
+                | UInt32
+                | UInt64
+                | Int8
+                | Int16
+                | Int32
+                | Int64
+                | Float32
+                | Float64
+                | Date32
+                | Date64
+                | Decimal128(_, _)
+        )
+    })
 }
diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs
index aab929f270..6d00bb44c8 100644
--- a/datafusion/row/src/lib.rs
+++ b/datafusion/row/src/lib.rs
@@ -36,7 +36,6 @@ use arrow::datatypes::Schema;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 pub use layout::row_supported;
-pub use layout::RowType;
 use std::sync::Arc;
 
 pub mod accessor;
@@ -102,7 +101,6 @@ fn get_columns(mut arrays: Vec<Box<dyn ArrayBuilder>>) -> 
Vec<ArrayRef> {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::layout::RowType::{Compact, WordAligned};
     use crate::reader::read_as_batch;
     use crate::writer::write_batch_unchecked;
     use arrow::record_batch::RecordBatch;
@@ -111,33 +109,33 @@ mod tests {
     use DataType::*;
 
     macro_rules! fn_test_single_type {
-        ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
+        ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
             paste::item! {
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_ $ROWTYPE _single_ $TYPE>]() -> Result<()> {
+                fn [<test _single_ $TYPE>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", 
$TYPE, true)]));
                     let a = $ARRAY::from($VEC);
                     let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(a)])?;
                     let mut vector = vec![0; 1024];
                     let row_offsets =
-                        { write_batch_unchecked(&mut vector, 0, &batch, 0, 
schema.clone(), $ROWTYPE) };
-                    let output_batch = { read_as_batch(&vector, schema, 
&row_offsets, $ROWTYPE)? };
+                        { write_batch_unchecked(&mut vector, 0, &batch, 0, 
schema.clone()) };
+                    let output_batch = { read_as_batch(&vector, schema, 
&row_offsets)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
 
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_ $ROWTYPE _single_ $TYPE _null_free>]() -> 
Result<()> {
+                fn [<test_single_ $TYPE _null_free>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", 
$TYPE, false)]));
                     let v = $VEC.into_iter().filter(|o| 
o.is_some()).collect::<Vec<_>>();
                     let a = $ARRAY::from(v);
                     let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(a)])?;
                     let mut vector = vec![0; 1024];
                     let row_offsets =
-                        { write_batch_unchecked(&mut vector, 0, &batch, 0, 
schema.clone(), $ROWTYPE) };
-                    let output_batch = { read_as_batch(&vector, schema, 
&row_offsets, $ROWTYPE)? };
+                        { write_batch_unchecked(&mut vector, 0, &batch, 0, 
schema.clone()) };
+                    let output_batch = { read_as_batch(&vector, schema, 
&row_offsets)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
@@ -148,229 +146,89 @@ mod tests {
     fn_test_single_type!(
         BooleanArray,
         Boolean,
-        vec![Some(true), Some(false), None, Some(true), None],
-        Compact
-    );
-
-    fn_test_single_type!(
-        BooleanArray,
-        Boolean,
-        vec![Some(true), Some(false), None, Some(true), None],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        Int8Array,
-        Int8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(true), Some(false), None, Some(true), None]
     );
 
     fn_test_single_type!(
         Int8Array,
         Int8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Int16Array,
         Int16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Int16Array,
-        Int16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Int32Array,
         Int32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Int32Array,
-        Int32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Int64Array,
         Int64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Int64Array,
-        Int64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        UInt8Array,
-        UInt8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt8Array,
         UInt8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt16Array,
         UInt16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        UInt16Array,
-        UInt16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        UInt32Array,
-        UInt32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt32Array,
         UInt32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        UInt64Array,
-        UInt64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         UInt64Array,
         UInt64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Float32Array,
         Float32,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Float32Array,
-        Float32,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        WordAligned
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
     );
 
     fn_test_single_type!(
         Float64Array,
         Float64,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Float64Array,
-        Float64,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
-        WordAligned
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
     );
 
     fn_test_single_type!(
         Date32Array,
         Date32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
-    );
-
-    fn_test_single_type!(
-        Date32Array,
-        Date32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        Date64Array,
-        Date64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     fn_test_single_type!(
         Date64Array,
         Date64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)],
-        WordAligned
-    );
-
-    fn_test_single_type!(
-        StringArray,
-        Utf8,
-        vec![Some("hello"), Some("world"), None, Some(""), Some("")],
-        Compact
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
     );
 
     #[test]
     #[should_panic(expected = "not supported yet")]
-    fn test_unsupported_word_aligned_type() {
+    fn test_unsupported_type() {
         let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"]));
         let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
         let schema = batch.schema();
         let mut vector = vec![0; 1024];
-        write_batch_unchecked(&mut vector, 0, &batch, 0, schema, WordAligned);
-    }
-
-    #[test]
-    fn test_single_binary() -> Result<()> {
-        let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, 
true)]));
-        let values: Vec<Option<&[u8]>> =
-            vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
-        let a = BinaryArray::from_opt_vec(values);
-        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
-        let mut vector = vec![0; 8192];
-        let row_offsets =
-            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), 
Compact) };
-        let output_batch = { read_as_batch(&vector, schema, &row_offsets, 
Compact)? };
-        assert_eq!(batch, output_batch);
-        Ok(())
-    }
-
-    #[test]
-    fn test_single_binary_null_free() -> Result<()> {
-        let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, 
false)]));
-        let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"];
-        let a = BinaryArray::from_vec(values);
-        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
-        let mut vector = vec![0; 8192];
-        let row_offsets =
-            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), 
Compact) };
-        let output_batch = { read_as_batch(&vector, schema, &row_offsets, 
Compact)? };
-        assert_eq!(batch, output_batch);
-        Ok(())
+        write_batch_unchecked(&mut vector, 0, &batch, 0, schema);
     }
 
     #[test]
@@ -380,16 +238,15 @@ mod tests {
         let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
         let schema = batch.schema();
         let mut vector = vec![0; 1024];
-        write_batch_unchecked(&mut vector, 0, &batch, 0, schema, Compact);
+        write_batch_unchecked(&mut vector, 0, &batch, 0, schema);
     }
 
     #[test]
     #[should_panic(expected = "not supported yet")]
     fn test_unsupported_type_read() {
-        let schema =
-            Arc::new(Schema::new(vec![Field::new("a", Decimal128(5, 2), 
false)]));
+        let schema = Arc::new(Schema::new(vec![Field::new("a", Utf8, false)]));
         let vector = vec![0; 1024];
         let row_offsets = vec![0];
-        read_as_batch(&vector, schema, &row_offsets, Compact).unwrap();
+        read_as_batch(&vector, schema, &row_offsets).unwrap();
     }
 }
diff --git a/datafusion/row/src/reader.rs b/datafusion/row/src/reader.rs
index a8dc8211f0..10c9896df7 100644
--- a/datafusion/row/src/reader.rs
+++ b/datafusion/row/src/reader.rs
@@ -17,7 +17,7 @@
 
 //! [`read_as_batch`] converts raw bytes to [`RecordBatch`]
 
-use crate::layout::{RowLayout, RowType};
+use crate::layout::RowLayout;
 use crate::validity::{all_valid, NullBitsFormatter};
 use crate::MutableRecordBatch;
 use arrow::array::*;
@@ -47,11 +47,10 @@ pub fn read_as_batch(
     data: &[u8],
     schema: Arc<Schema>,
     offsets: &[usize],
-    row_type: RowType,
 ) -> Result<RecordBatch> {
     let row_num = offsets.len();
     let mut output = MutableRecordBatch::new(row_num, schema.clone());
-    let mut row = RowReader::new(&schema, row_type);
+    let mut row = RowReader::new(&schema);
 
     for offset in offsets.iter().take(row_num) {
         row.point_to(*offset, data);
@@ -129,9 +128,9 @@ impl<'a> std::fmt::Debug for RowReader<'a> {
 
 impl<'a> RowReader<'a> {
     /// new
-    pub fn new(schema: &Schema, row_type: RowType) -> Self {
+    pub fn new(schema: &Schema) -> Self {
         Self {
-            layout: RowLayout::new(schema, row_type),
+            layout: RowLayout::new(schema),
             data: &[],
             base_offset: 0,
         }
@@ -217,25 +216,6 @@ impl<'a> RowReader<'a> {
         get_idx!(i128, self, idx, 16)
     }
 
-    fn get_utf8(&self, idx: usize) -> &str {
-        self.assert_index_valid(idx);
-        let offset_size = self.get_u64(idx);
-        let offset = (offset_size >> 32) as usize;
-        let len = (offset_size & 0xffff_ffff) as usize;
-        let varlena_offset = self.base_offset + offset;
-        let bytes = &self.data[varlena_offset..varlena_offset + len];
-        unsafe { std::str::from_utf8_unchecked(bytes) }
-    }
-
-    fn get_binary(&self, idx: usize) -> &[u8] {
-        self.assert_index_valid(idx);
-        let offset_size = self.get_u64(idx);
-        let offset = (offset_size >> 32) as usize;
-        let len = (offset_size & 0xffff_ffff) as usize;
-        let varlena_offset = self.base_offset + offset;
-        &self.data[varlena_offset..varlena_offset + len]
-    }
-
     fn_get_idx_opt!(bool);
     fn_get_idx_opt!(u8);
     fn_get_idx_opt!(u16);
@@ -271,14 +251,6 @@ impl<'a> RowReader<'a> {
             None
         }
     }
-
-    fn get_utf8_opt(&self, idx: usize) -> Option<&str> {
-        if self.is_valid_at(idx) {
-            Some(self.get_utf8(idx))
-        } else {
-            None
-        }
-    }
 }
 
 /// Read the row currently pointed by RowWriter to the output columnar batch 
buffer
@@ -339,31 +311,8 @@ fn_read_field!(f32, Float32Builder);
 fn_read_field!(f64, Float64Builder);
 fn_read_field!(date32, Date32Builder);
 fn_read_field!(date64, Date64Builder);
-fn_read_field!(utf8, StringBuilder);
 fn_read_field!(decimal128, Decimal128Builder);
 
-pub(crate) fn read_field_binary(
-    to: &mut Box<dyn ArrayBuilder>,
-    col_idx: usize,
-    row: &RowReader,
-) {
-    let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
-    if row.is_valid_at(col_idx) {
-        to.append_value(row.get_binary(col_idx));
-    } else {
-        to.append_null();
-    }
-}
-
-pub(crate) fn read_field_binary_null_free(
-    to: &mut Box<dyn ArrayBuilder>,
-    col_idx: usize,
-    row: &RowReader,
-) {
-    let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
-    to.append_value(row.get_binary(col_idx));
-}
-
 fn read_field(
     to: &mut Box<dyn ArrayBuilder>,
     dt: &DataType,
@@ -385,8 +334,6 @@ fn read_field(
         Float64 => read_field_f64(to, col_idx, row),
         Date32 => read_field_date32(to, col_idx, row),
         Date64 => read_field_date64(to, col_idx, row),
-        Utf8 => read_field_utf8(to, col_idx, row),
-        Binary => read_field_binary(to, col_idx, row),
         Decimal128(_, _) => read_field_decimal128(to, col_idx, row),
         _ => unimplemented!(),
     }
@@ -413,8 +360,6 @@ fn read_field_null_free(
         Float64 => read_field_f64_null_free(to, col_idx, row),
         Date32 => read_field_date32_null_free(to, col_idx, row),
         Date64 => read_field_date64_null_free(to, col_idx, row),
-        Utf8 => read_field_utf8_null_free(to, col_idx, row),
-        Binary => read_field_binary_null_free(to, col_idx, row),
         Decimal128(_, _) => read_field_decimal128_null_free(to, col_idx, row),
         _ => unimplemented!(),
     }
diff --git a/datafusion/row/src/writer.rs b/datafusion/row/src/writer.rs
index 7bf9ac0267..14ce6afe68 100644
--- a/datafusion/row/src/writer.rs
+++ b/datafusion/row/src/writer.rs
@@ -17,17 +17,13 @@
 
 //! [`RowWriter`] writes [`RecordBatch`]es to `Vec<u8>` to stitch attributes 
together
 
-use crate::layout::{estimate_row_width, RowLayout, RowType};
+use crate::layout::RowLayout;
 use arrow::array::*;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
-use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw};
-use datafusion_common::cast::{
-    as_binary_array, as_date32_array, as_date64_array, as_decimal128_array,
-    as_string_array,
-};
+use arrow::util::bit_util::{set_bit_raw, unset_bit_raw};
+use datafusion_common::cast::{as_date32_array, as_date64_array, 
as_decimal128_array};
 use datafusion_common::Result;
-use std::cmp::max;
 use std::sync::Arc;
 
 /// Append batch from `row_idx` to `output` buffer start from `offset`
@@ -40,9 +36,8 @@ pub fn write_batch_unchecked(
     batch: &RecordBatch,
     row_idx: usize,
     schema: Arc<Schema>,
-    row_type: RowType,
 ) -> Vec<usize> {
-    let mut writer = RowWriter::new(&schema, row_type);
+    let mut writer = RowWriter::new(&schema);
     let mut current_offset = offset;
     let mut offsets = vec![];
     let columns = batch.columns();
@@ -62,9 +57,8 @@ pub fn write_batch_unchecked(
 pub fn bench_write_batch(
     batches: &[Vec<RecordBatch>],
     schema: Arc<Schema>,
-    row_type: RowType,
 ) -> Result<Vec<usize>> {
-    let mut writer = RowWriter::new(&schema, row_type);
+    let mut writer = RowWriter::new(&schema);
     let mut lengths = vec![];
 
     for batch in batches.iter().flatten() {
@@ -125,33 +119,24 @@ pub struct RowWriter {
     data: Vec<u8>,
     /// Length in bytes for the current tuple, 8-bytes word aligned.
     pub(crate) row_width: usize,
-    /// Length in bytes for `variable length data` part of the current tuple.
-    varlena_width: usize,
-    /// Current offset for the next variable length field to write to.
-    varlena_offset: usize,
 }
 
 impl RowWriter {
     /// New
-    pub fn new(schema: &Schema, row_type: RowType) -> Self {
-        let layout = RowLayout::new(schema, row_type);
-        let init_capacity = estimate_row_width(schema, &layout);
-        let varlena_offset = layout.fixed_part_width();
+    pub fn new(schema: &Schema) -> Self {
+        let layout = RowLayout::new(schema);
+        let init_capacity = layout.fixed_part_width();
         Self {
             layout,
             data: vec![0; init_capacity],
-            row_width: 0,
-            varlena_width: 0,
-            varlena_offset,
+            row_width: init_capacity,
         }
     }
 
     /// Reset the row writer state for new tuple
     pub fn reset(&mut self) {
         self.data.fill(0);
-        self.row_width = 0;
-        self.varlena_width = 0;
-        self.varlena_offset = self.layout.fixed_part_width();
+        self.row_width = self.layout.fixed_part_width();
     }
 
     #[inline]
@@ -230,45 +215,6 @@ impl RowWriter {
         set_idx!(16, self, idx, value)
     }
 
-    fn set_offset_size(&mut self, idx: usize, size: u32) {
-        let offset_and_size: u64 = (self.varlena_offset as u64) << 32 | (size 
as u64);
-        self.set_u64(idx, offset_and_size);
-    }
-
-    fn set_utf8(&mut self, idx: usize, value: &str) {
-        self.assert_index_valid(idx);
-        let bytes = value.as_bytes();
-        let size = bytes.len();
-        self.set_offset_size(idx, size as u32);
-        let varlena_offset = self.varlena_offset;
-        self.data[varlena_offset..varlena_offset + 
size].copy_from_slice(bytes);
-        self.varlena_offset += size;
-        self.varlena_width += size;
-    }
-
-    fn set_binary(&mut self, idx: usize, value: &[u8]) {
-        self.assert_index_valid(idx);
-        let size = value.len();
-        self.set_offset_size(idx, size as u32);
-        let varlena_offset = self.varlena_offset;
-        self.data[varlena_offset..varlena_offset + 
size].copy_from_slice(value);
-        self.varlena_offset += size;
-        self.varlena_width += size;
-    }
-
-    fn current_width(&self) -> usize {
-        self.layout.fixed_part_width() + self.varlena_width
-    }
-
-    /// End each row at 8-byte word boundary.
-    pub(crate) fn end_padding(&mut self) {
-        let payload_width = self.current_width();
-        self.row_width = round_upto_power_of_2(payload_width, 8);
-        if self.data.len() < self.row_width {
-            self.data.resize(self.row_width, 0);
-        }
-    }
-
     /// Get raw bytes
     pub fn get_row(&self) -> &[u8] {
         &self.data[0..self.row_width]
@@ -298,7 +244,6 @@ pub fn write_row(
         }
     }
 
-    row_writer.end_padding();
     row_writer.row_width
 }
 
@@ -350,36 +295,6 @@ pub(crate) fn write_field_date64(
     to.set_date64(col_idx, from.value(row_idx));
 }
 
-pub(crate) fn write_field_utf8(
-    to: &mut RowWriter,
-    from: &Arc<dyn Array>,
-    col_idx: usize,
-    row_idx: usize,
-) {
-    let from = as_string_array(from).unwrap();
-    let s = from.value(row_idx);
-    let new_width = to.current_width() + s.as_bytes().len();
-    if new_width > to.data.len() {
-        to.data.resize(max(to.data.capacity(), new_width), 0);
-    }
-    to.set_utf8(col_idx, s);
-}
-
-pub(crate) fn write_field_binary(
-    to: &mut RowWriter,
-    from: &Arc<dyn Array>,
-    col_idx: usize,
-    row_idx: usize,
-) {
-    let from = as_binary_array(from).unwrap();
-    let s = from.value(row_idx);
-    let new_width = to.current_width() + s.len();
-    if new_width > to.data.len() {
-        to.data.resize(max(to.data.capacity(), new_width), 0);
-    }
-    to.set_binary(col_idx, s);
-}
-
 pub(crate) fn write_field_decimal128(
     to: &mut RowWriter,
     from: &Arc<dyn Array>,
@@ -412,8 +327,6 @@ fn write_field(
         Float64 => write_field_f64(row, col, col_idx, row_idx),
         Date32 => write_field_date32(row, col, col_idx, row_idx),
         Date64 => write_field_date64(row, col, col_idx, row_idx),
-        Utf8 => write_field_utf8(row, col, col_idx, row_idx),
-        Binary => write_field_binary(row, col, col_idx, row_idx),
         Decimal128(_, _) => write_field_decimal128(row, col, col_idx, row_idx),
         _ => unimplemented!(),
     }

Reply via email to