This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 463ec30743 feat: Remove compact row since it's no longer used (#6021)
463ec30743 is described below
commit 463ec3074318c5bf31a6a1377ab5f01159d8a2ce
Author: Yijie Shen <[email protected]>
AuthorDate: Mon Apr 17 21:08:35 2023 +0800
feat: Remove compact row since it's no longer used (#6021)
* Remove compact row since it's no longer used
* rm
* fmt
* clippy
---
datafusion/core/benches/jit.rs | 28 +--
.../core/src/physical_plan/aggregates/row_hash.rs | 20 +-
datafusion/core/tests/row.rs | 29 +--
datafusion/row/src/accessor.rs | 6 +-
datafusion/row/src/jit/mod.rs | 203 ++---------------
datafusion/row/src/jit/reader.rs | 14 +-
datafusion/row/src/jit/writer.rs | 15 +-
datafusion/row/src/layout.rs | 239 +++++++--------------
datafusion/row/src/lib.rs | 193 +++--------------
datafusion/row/src/reader.rs | 63 +-----
datafusion/row/src/writer.rs | 107 +--------
11 files changed, 153 insertions(+), 764 deletions(-)
diff --git a/datafusion/core/benches/jit.rs b/datafusion/core/benches/jit.rs
index 0c6de319d2..d42df8e033 100644
--- a/datafusion/core/benches/jit.rs
+++ b/datafusion/core/benches/jit.rs
@@ -25,7 +25,6 @@ use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::jit::writer::bench_write_batch_jit;
use datafusion::row::writer::bench_write_batch;
-use datafusion::row::RowType;
use std::sync::Arc;
fn criterion_benchmark(c: &mut Criterion) {
@@ -37,38 +36,15 @@ fn criterion_benchmark(c: &mut Criterion) {
let batches =
create_record_batches(schema.clone(), array_len, partitions_len,
batch_size);
- c.bench_function("compact row serializer", |b| {
- b.iter(|| {
- criterion::black_box(
- bench_write_batch(&batches, schema.clone(),
RowType::Compact).unwrap(),
- )
- })
- });
-
c.bench_function("word aligned row serializer", |b| {
b.iter(|| {
- criterion::black_box(
- bench_write_batch(&batches, schema.clone(),
RowType::WordAligned)
- .unwrap(),
- )
- })
- });
-
- c.bench_function("compact row serializer jit", |b| {
- b.iter(|| {
- criterion::black_box(
- bench_write_batch_jit(&batches, schema.clone(),
RowType::Compact)
- .unwrap(),
- )
+ criterion::black_box(bench_write_batch(&batches,
schema.clone()).unwrap())
})
});
c.bench_function("word aligned row serializer jit", |b| {
b.iter(|| {
- criterion::black_box(
- bench_write_batch_jit(&batches, schema.clone(),
RowType::WordAligned)
- .unwrap(),
- )
+ criterion::black_box(bench_write_batch_jit(&batches,
schema.clone()).unwrap())
})
});
}
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 9a75da02fb..d9e42e478d 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -51,13 +51,13 @@ use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::{MutableRecordBatch, RowType};
+use datafusion_row::MutableRecordBatch;
use hashbrown::raw::RawTable;
/// Grouping aggregate with row-format aggregation states inside.
///
/// For each aggregation entry, we use:
-/// - [Compact] row represents grouping keys for fast hash computation and
comparison directly on raw bytes.
+/// - [Arrow-row] represents grouping keys for fast hash computation and
comparison directly on raw bytes.
/// - [WordAligned] row to store aggregation state, designed to be
CPU-friendly when updates over every field are often.
///
/// The architecture is the following:
@@ -68,8 +68,8 @@ use hashbrown::raw::RawTable;
/// 4. The state's RecordBatch is `merge`d to a new state
/// 5. The state is mapped to the final value
///
-/// [Compact]: datafusion_row::layout::RowType::Compact
-/// [WordAligned]: datafusion_row::layout::RowType::WordAligned
+/// [Arrow-row]: OwnedRow
+/// [WordAligned]: datafusion_row::layout
pub(crate) struct GroupedHashAggregateStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
@@ -203,8 +203,7 @@ impl GroupedHashAggregateStream {
.collect(),
)?;
- let row_aggr_layout =
- Arc::new(RowLayout::new(&row_aggr_schema, RowType::WordAligned));
+ let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema));
let name = format!("GroupedHashAggregateStream[{partition}]");
let aggr_state = AggregationState {
@@ -632,15 +631,14 @@ impl GroupedHashAggregateStream {
// Store row accumulator results (either final output or intermediate
state):
let row_columns = match self.mode {
AggregateMode::Partial => {
- read_as_batch(&state_buffers, &self.row_aggr_schema,
RowType::WordAligned)
+ read_as_batch(&state_buffers, &self.row_aggr_schema)
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
| AggregateMode::Single => {
let mut results = vec![];
for (idx, acc) in self.row_accumulators.iter().enumerate() {
- let mut state_accessor =
- RowAccessor::new(&self.row_aggr_schema,
RowType::WordAligned);
+ let mut state_accessor =
RowAccessor::new(&self.row_aggr_schema);
let current = state_buffers
.iter_mut()
.map(|buffer| {
@@ -727,10 +725,10 @@ impl GroupedHashAggregateStream {
}
}
-fn read_as_batch(rows: &[Vec<u8>], schema: &Schema, row_type: RowType) ->
Vec<ArrayRef> {
+fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
let row_num = rows.len();
let mut output = MutableRecordBatch::new(row_num,
Arc::new(schema.clone()));
- let mut row = RowReader::new(schema, row_type);
+ let mut row = RowReader::new(schema);
for data in rows {
row.point_to(0, data);
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 5eeb237e18..55310bb611 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -23,7 +23,6 @@ use datafusion::execution::context::SessionState;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
-use datafusion_row::layout::RowType::{Compact, WordAligned};
use datafusion_row::reader::read_as_batch;
use datafusion_row::writer::write_batch_unchecked;
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
@@ -31,29 +30,6 @@ use std::sync::Arc;
#[tokio::test]
async fn test_with_parquet() -> Result<()> {
- let ctx = SessionContext::new();
- let state = ctx.state();
- let task_ctx = state.task_ctx();
- let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
- let exec =
- get_exec(&state, "alltypes_plain.parquet", projection.as_ref(),
None).await?;
- let schema = exec.schema().clone();
-
- let batches = collect(exec, task_ctx).await?;
- assert_eq!(1, batches.len());
- let batch = &batches[0];
-
- let mut vector = vec![0; 20480];
- let row_offsets =
- { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(),
Compact) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets,
Compact)? };
- assert_eq!(*batch, output_batch);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_with_parquet_word_aligned() -> Result<()> {
let ctx = SessionContext::new();
let state = ctx.state();
let task_ctx = state.task_ctx();
@@ -67,9 +43,8 @@ async fn test_with_parquet_word_aligned() -> Result<()> {
let batch = &batches[0];
let mut vector = vec![0; 20480];
- let row_offsets =
- { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(),
WordAligned) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets,
WordAligned)? };
+ let row_offsets = { write_batch_unchecked(&mut vector, 0, batch, 0,
schema.clone()) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
assert_eq!(*batch, output_batch);
Ok(())
diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs
index e7b4ed8501..bba44f0e56 100644
--- a/datafusion/row/src/accessor.rs
+++ b/datafusion/row/src/accessor.rs
@@ -17,7 +17,7 @@
//! [`RowAccessor`] provides a Read/Write/Modify access for row with all
fixed-sized fields:
-use crate::layout::{RowLayout, RowType};
+use crate::layout::RowLayout;
use crate::validity::NullBitsFormatter;
use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx};
use arrow::datatypes::{DataType, Schema};
@@ -116,9 +116,9 @@ macro_rules! fn_get_idx_scalar {
impl<'a> RowAccessor<'a> {
/// new
- pub fn new(schema: &Schema, row_type: RowType) -> Self {
+ pub fn new(schema: &Schema) -> Self {
Self {
- layout: Arc::new(RowLayout::new(schema, row_type)),
+ layout: Arc::new(RowLayout::new(schema)),
data: &mut [],
base_offset: 0,
}
diff --git a/datafusion/row/src/jit/mod.rs b/datafusion/row/src/jit/mod.rs
index 03b8eed186..803c96b176 100644
--- a/datafusion/row/src/jit/mod.rs
+++ b/datafusion/row/src/jit/mod.rs
@@ -45,7 +45,6 @@ fn fn_name<T>(f: T) -> &'static str {
mod tests {
use crate::jit::reader::read_as_batch_jit;
use crate::jit::writer::write_batch_unchecked_jit;
- use crate::layout::RowType::{Compact, WordAligned};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_common::Result;
@@ -54,26 +53,26 @@ mod tests {
use DataType::*;
macro_rules! fn_test_single_type {
- ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
+ ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
paste::item! {
#[test]
#[allow(non_snake_case)]
- fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
+ fn [<test_single_ $TYPE _jit>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a",
$TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
- { write_batch_unchecked_jit(&mut vector, 0, &batch, 0,
schema.clone(), &assembler, $ROWTYPE)? };
- let output_batch = { read_as_batch_jit(&vector, schema,
&row_offsets, &assembler, $ROWTYPE)? };
+ { write_batch_unchecked_jit(&mut vector, 0, &batch, 0,
schema.clone(), &assembler)? };
+ let output_batch = { read_as_batch_jit(&vector, schema,
&row_offsets, &assembler)? };
assert_eq!(batch, output_batch);
Ok(())
}
#[test]
#[allow(non_snake_case)]
- fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() ->
Result<()> {
+ fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a",
$TYPE, false)]));
let v = $VEC.into_iter().filter(|o|
o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
@@ -81,8 +80,8 @@ mod tests {
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
- { write_batch_unchecked_jit(&mut vector, 0, &batch, 0,
schema.clone(), &assembler, $ROWTYPE)? };
- let output_batch = { read_as_batch_jit(&vector, schema,
&row_offsets, &assembler, $ROWTYPE)? };
+ { write_batch_unchecked_jit(&mut vector, 0, &batch, 0,
schema.clone(), &assembler)? };
+ let output_batch = { read_as_batch_jit(&vector, schema,
&row_offsets, &assembler)? };
assert_eq!(batch, output_batch);
Ok(())
}
@@ -93,240 +92,78 @@ mod tests {
fn_test_single_type!(
BooleanArray,
Boolean,
- vec![Some(true), Some(false), None, Some(true), None],
- Compact
- );
-
- fn_test_single_type!(
- BooleanArray,
- Boolean,
- vec![Some(true), Some(false), None, Some(true), None],
- WordAligned
- );
-
- fn_test_single_type!(
- Int8Array,
- Int8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(true), Some(false), None, Some(true), None]
);
fn_test_single_type!(
Int8Array,
Int8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Int16Array,
Int16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Int16Array,
- Int16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Int32Array,
Int32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Int32Array,
- Int32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Int64Array,
Int64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Int64Array,
- Int64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- UInt8Array,
- UInt8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt8Array,
UInt8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt16Array,
UInt16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- UInt16Array,
- UInt16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- UInt32Array,
- UInt32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt32Array,
UInt32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- UInt64Array,
- UInt64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt64Array,
UInt64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Float32Array,
Float32,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- Compact
- );
-
- fn_test_single_type!(
- Float32Array,
- Float32,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- WordAligned
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
);
fn_test_single_type!(
Float64Array,
Float64,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- Compact
- );
-
- fn_test_single_type!(
- Float64Array,
- Float64,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- WordAligned
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
);
fn_test_single_type!(
Date32Array,
Date32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Date32Array,
- Date32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- Date64Array,
- Date64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Date64Array,
Date64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
-
- fn_test_single_type!(
- StringArray,
- Utf8,
- vec![Some("hello"), Some("world"), None, Some(""), Some("")],
- Compact
- );
-
- #[test]
- fn test_single_binary_jit() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![Field::new("a", Binary,
true)]));
- let values: Vec<Option<&[u8]>> =
- vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
- let a = BinaryArray::from_opt_vec(values);
- let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
- let mut vector = vec![0; 8192];
- let assembler = Assembler::default();
- let row_offsets = {
- write_batch_unchecked_jit(
- &mut vector,
- 0,
- &batch,
- 0,
- schema.clone(),
- &assembler,
- Compact,
- )?
- };
- let output_batch =
- { read_as_batch_jit(&vector, schema, &row_offsets, &assembler,
Compact)? };
- assert_eq!(batch, output_batch);
- Ok(())
- }
-
- #[test]
- fn test_single_binary_jit_null_free() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![Field::new("a", Binary,
false)]));
- let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"];
- let a = BinaryArray::from_vec(values);
- let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
- let mut vector = vec![0; 8192];
- let assembler = Assembler::default();
- let row_offsets = {
- write_batch_unchecked_jit(
- &mut vector,
- 0,
- &batch,
- 0,
- schema.clone(),
- &assembler,
- Compact,
- )?
- };
- let output_batch =
- { read_as_batch_jit(&vector, schema, &row_offsets, &assembler,
Compact)? };
- assert_eq!(batch, output_batch);
- Ok(())
- }
}
diff --git a/datafusion/row/src/jit/reader.rs b/datafusion/row/src/jit/reader.rs
index d670354245..5d91de7be3 100644
--- a/datafusion/row/src/jit/reader.rs
+++ b/datafusion/row/src/jit/reader.rs
@@ -18,7 +18,6 @@
//! Accessing row from raw bytes with JIT
use crate::jit::fn_name;
-use crate::layout::RowType;
use crate::reader::RowReader;
use crate::reader::*;
use crate::reg_fn;
@@ -39,11 +38,10 @@ pub fn read_as_batch_jit(
schema: Arc<Schema>,
offsets: &[usize],
assembler: &Assembler,
- row_type: RowType,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
- let mut row = RowReader::new(&schema, row_type);
+ let mut row = RowReader::new(&schema);
register_read_functions(assembler)?;
let gen_func = gen_read_row(&schema, assembler)?;
let mut jit = assembler.create_jit();
@@ -84,8 +82,6 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
reg_fn!(asm, read_field_f64, reader_param.clone(), None);
reg_fn!(asm, read_field_date32, reader_param.clone(), None);
reg_fn!(asm, read_field_date64, reader_param.clone(), None);
- reg_fn!(asm, read_field_utf8, reader_param.clone(), None);
- reg_fn!(asm, read_field_binary, reader_param.clone(), None);
reg_fn!(asm, read_field_bool_null_free, reader_param.clone(), None);
reg_fn!(asm, read_field_u8_null_free, reader_param.clone(), None);
reg_fn!(asm, read_field_u16_null_free, reader_param.clone(), None);
@@ -98,9 +94,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
reg_fn!(asm, read_field_f32_null_free, reader_param.clone(), None);
reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None);
reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None);
- reg_fn!(asm, read_field_date64_null_free, reader_param.clone(), None);
- reg_fn!(asm, read_field_utf8_null_free, reader_param.clone(), None);
- reg_fn!(asm, read_field_binary_null_free, reader_param, None);
+ reg_fn!(asm, read_field_date64_null_free, reader_param, None);
Ok(())
}
@@ -134,8 +128,6 @@ fn gen_read_row(schema: &Schema, assembler: &Assembler) ->
Result<GeneratedFunct
Float64 => b.call_stmt("read_field_f64", params)?,
Date32 => b.call_stmt("read_field_date32", params)?,
Date64 => b.call_stmt("read_field_date64", params)?,
- Utf8 => b.call_stmt("read_field_utf8", params)?,
- Binary => b.call_stmt("read_field_binary", params)?,
_ => unimplemented!(),
}
} else {
@@ -153,8 +145,6 @@ fn gen_read_row(schema: &Schema, assembler: &Assembler) ->
Result<GeneratedFunct
Float64 => b.call_stmt("read_field_f64_null_free", params)?,
Date32 => b.call_stmt("read_field_date32_null_free", params)?,
Date64 => b.call_stmt("read_field_date64_null_free", params)?,
- Utf8 => b.call_stmt("read_field_utf8_null_free", params)?,
- Binary => b.call_stmt("read_field_binary_null_free", params)?,
_ => unimplemented!(),
}
}
diff --git a/datafusion/row/src/jit/writer.rs b/datafusion/row/src/jit/writer.rs
index 38c3bd4114..cb96f34fab 100644
--- a/datafusion/row/src/jit/writer.rs
+++ b/datafusion/row/src/jit/writer.rs
@@ -18,7 +18,6 @@
//! Reusable JIT version of row writer backed by Vec<u8> to stitch attributes
together
use crate::jit::fn_name;
-use crate::layout::RowType;
use crate::reg_fn;
use crate::schema_null_free;
use crate::writer::RowWriter;
@@ -44,9 +43,8 @@ pub fn write_batch_unchecked_jit(
row_idx: usize,
schema: Arc<Schema>,
assembler: &Assembler,
- row_type: RowType,
) -> Result<Vec<usize>> {
- let mut writer = RowWriter::new(&schema, row_type);
+ let mut writer = RowWriter::new(&schema);
let mut current_offset = offset;
let mut offsets = vec![];
register_write_functions(assembler)?;
@@ -61,7 +59,6 @@ pub fn write_batch_unchecked_jit(
for cur_row in row_idx..batch.num_rows() {
offsets.push(current_offset);
code_fn(&mut writer, cur_row, batch);
- writer.end_padding();
let row_width = writer.row_width;
output[current_offset..current_offset + row_width]
.copy_from_slice(writer.get_row());
@@ -76,10 +73,9 @@ pub fn write_batch_unchecked_jit(
pub fn bench_write_batch_jit(
batches: &[Vec<RecordBatch>],
schema: Arc<Schema>,
- row_type: RowType,
) -> Result<Vec<usize>> {
let assembler = Assembler::default();
- let mut writer = RowWriter::new(&schema, row_type);
+ let mut writer = RowWriter::new(&schema);
let mut lengths = vec![];
register_write_functions(&assembler)?;
let gen_func = gen_write_row(&schema, &assembler)?;
@@ -92,7 +88,6 @@ pub fn bench_write_batch_jit(
for batch in batches.iter().flatten() {
for cur_row in 0..batch.num_rows() {
code_fn(&mut writer, cur_row, batch);
- writer.end_padding();
lengths.push(writer.row_width);
writer.reset()
}
@@ -123,9 +118,7 @@ fn register_write_functions(asm: &Assembler) -> Result<()> {
reg_fn!(asm, write_field_f32, reader_param.clone(), None);
reg_fn!(asm, write_field_f64, reader_param.clone(), None);
reg_fn!(asm, write_field_date32, reader_param.clone(), None);
- reg_fn!(asm, write_field_date64, reader_param.clone(), None);
- reg_fn!(asm, write_field_utf8, reader_param.clone(), None);
- reg_fn!(asm, write_field_binary, reader_param, None);
+ reg_fn!(asm, write_field_date64, reader_param, None);
Ok(())
}
@@ -202,8 +195,6 @@ fn write_typed_field_stmt(
Float64 => b.call_stmt("write_field_f64", params)?,
Date32 => b.call_stmt("write_field_date32", params)?,
Date64 => b.call_stmt("write_field_date64", params)?,
- Utf8 => b.call_stmt("write_field_utf8", params)?,
- Binary => b.call_stmt("write_field_binary", params)?,
_ => unimplemented!(),
}
Ok(())
diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs
index 6a8e8a78ec..7147132753 100644
--- a/datafusion/row/src/layout.rs
+++ b/datafusion/row/src/layout.rs
@@ -21,64 +21,54 @@ use crate::schema_null_free;
use arrow::datatypes::{DataType, Schema};
use arrow::util::bit_util::{ceil, round_upto_power_of_2};
-const UTF8_DEFAULT_SIZE: usize = 20;
-const BINARY_DEFAULT_SIZE: usize = 100;
-
-#[derive(Copy, Clone, Debug)]
-/// Type of a RowLayout
-pub enum RowType {
- /// Stores each field with minimum bytes for space efficiency.
- ///
- /// Its typical use case represents a sorting payload that
- /// accesses all row fields as a unit.
- ///
- /// Each tuple consists of up to three parts: "`null bit set`" ,
- /// "`values`" and "`var length data`"
- ///
- /// The null bit set is used for null tracking and is aligned to 1-byte.
It stores
- /// one bit per field.
- ///
- /// In the region of the values, we store the fields in the order they are
defined in the schema.
- /// - For fixed-length, sequential access fields, we store them directly.
- /// E.g., 4 bytes for int and 1 byte for bool.
- /// - For fixed-length, update often fields, we store one 8-byte word per
field.
- /// - For fields of non-primitive or variable-length types,
- /// we append their actual content to the end of the var length
region and
- /// store their offset relative to row base and their length, packed
into an 8-byte word.
- ///
- /// ```plaintext
- /// ┌────────────────┬──────────────────────────┬───────────────────────┐
┌───────────────────────┬────────────┐
- /// │Validity Bitmask│ Fixed Width Field │ Variable Width Field │
... │ vardata area │ padding │
- /// │ (byte aligned) │ (native type width) │(vardata offset + len) │
│ (variable length) │ bytes │
- /// └────────────────┴──────────────────────────┴───────────────────────┘
└───────────────────────┴────────────┘
- /// ```
- ///
- /// For example, given the schema (Int8, Utf8, Float32, Utf8)
- ///
- /// Encoding the tuple (1, "FooBar", NULL, "baz")
- ///
- /// Requires 32 bytes (31 bytes payload and 1 byte padding to make each
tuple 8-bytes aligned):
- ///
- /// ```plaintext
- ///
┌──────────┬──────────┬──────────────────────┬──────────────┬──────────────────────┬───────────────────────┬──────────┐
- /// │0b00001011│ 0x01 │0x00000016 0x00000006│ 0x00000000
│0x0000001C 0x00000003│ FooBarbaz │ 0x00 │
- ///
└──────────┴──────────┴──────────────────────┴──────────────┴──────────────────────┴───────────────────────┴──────────┘
- /// 0 1 2 10 14
22 31 32
- /// ```
- Compact,
-
- /// This type of layout stores one 8-byte word per field for CPU-friendly,
- /// It is mainly used to represent the rows with frequently updated
content,
- /// for example, grouping state for hash aggregation.
- WordAligned,
- // RawComparable,
-}
-
-/// Reveals how the fields of a record are stored in the raw-bytes format
+/// Row layout stores one or multiple 8-byte word(s) per field for CPU-friendly
+/// and efficient processing.
+///
+/// It is mainly used to represent the rows with frequently updated content,
+/// for example, grouping state for hash aggregation.
+///
+/// Each tuple consists of two parts: "`null bit set`" and "`values`".
+///
+/// For null-free tuples, the null bit set can be omitted.
+///
+/// The null bit set, when present, is aligned to 8 bytes. It stores one bit
per field.
+///
+/// In the region of the values, we store the fields in the order they are
defined in the schema.
+/// Each field is stored in one or multiple 8-byte words.
+///
+/// ```plaintext
+/// ┌─────────────────┬─────────────────────┐
+/// │Validity Bitmask │ Fields │
+/// │ (8-byte aligned)│ (8-byte words) │
+/// └─────────────────┴─────────────────────┘
+/// ```
+///
+/// For example, given the schema (Int8, Float32, Int64) with a null-free
tuple
+///
+/// Encoding the tuple (1, 3.14, 42)
+///
+/// Requires 24 bytes (3 fields * 8 bytes each):
+///
+/// ```plaintext
+/// ┌──────────────────────┬──────────────────────┬──────────────────────┐
+/// │ 0x01 │ 0x4048F5C3 │ 0x0000002A │
+/// └──────────────────────┴──────────────────────┴──────────────────────┘
+/// 0 8 16 24
+/// ```
+///
+/// If the schema allows null values and the tuple is (1, NULL, 42)
+///
+/// Encoding the tuple requires 32 bytes (1 * 8 bytes for the null bit set +
3 fields * 8 bytes each):
+///
+/// ```plaintext
+///
┌──────────────────────────┬──────────────────────┬──────────────────────┬──────────────────────┐
+/// │ 0b00000101 │ 0x01 │ 0x00000000 │
0x0000002A │
+/// │ (7 bytes padding after) │ │ │
│
+///
└──────────────────────────┴──────────────────────┴──────────────────────┴──────────────────────┘
+/// 0 8 16
24 32
+/// ```
#[derive(Debug, Clone)]
pub struct RowLayout {
- /// Type of the layout
- row_type: RowType,
/// If a row is null free according to its schema
pub(crate) null_free: bool,
/// The number of bytes used to store null bits for each field.
@@ -93,27 +83,20 @@ pub struct RowLayout {
impl RowLayout {
/// new
- pub fn new(schema: &Schema, row_type: RowType) -> Self {
+ pub fn new(schema: &Schema) -> Self {
assert!(
- row_supported(schema, row_type),
- "{row_type:?}Row with {schema:?} not supported yet.",
+ row_supported(schema),
+ "Row with {schema:?} not supported yet.",
);
let null_free = schema_null_free(schema);
let field_count = schema.fields().len();
let null_width = if null_free {
0
} else {
- match row_type {
- RowType::Compact => ceil(field_count, 8),
- RowType::WordAligned =>
round_upto_power_of_2(ceil(field_count, 8), 8),
- }
- };
- let (field_offsets, values_width) = match row_type {
- RowType::Compact => compact_offsets(null_width, schema),
- RowType::WordAligned => word_aligned_offsets(null_width, schema),
+ round_upto_power_of_2(ceil(field_count, 8), 8)
};
+ let (field_offsets, values_width) = word_aligned_offsets(null_width,
schema);
Self {
- row_type,
null_free,
null_width,
values_width,
@@ -129,36 +112,6 @@ impl RowLayout {
}
}
-/// Get relative offsets for each field and total width for values
-fn compact_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>, usize) {
- let mut offsets = vec![];
- let mut offset = null_width;
- for f in schema.fields() {
- offsets.push(offset);
- offset += compact_type_width(f.data_type());
- }
- (offsets, offset - null_width)
-}
-
-fn var_length(dt: &DataType) -> bool {
- use DataType::*;
- matches!(dt, Utf8 | Binary)
-}
-
-fn compact_type_width(dt: &DataType) -> usize {
- use DataType::*;
- if var_length(dt) {
- return std::mem::size_of::<u64>();
- }
- match dt {
- Boolean | UInt8 | Int8 => 1,
- UInt16 | Int16 => 2,
- UInt32 | Int32 | Float32 | Date32 => 4,
- UInt64 | Int64 | Float64 | Date64 => 8,
- _ => unreachable!(),
- }
-}
-
fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>,
usize) {
let mut offsets = vec![];
let mut offset = null_width;
@@ -175,76 +128,30 @@ fn word_aligned_offsets(null_width: usize, schema:
&Schema) -> (Vec<usize>, usiz
(offsets, offset - null_width)
}
-/// Estimate row width based on schema
-pub(crate) fn estimate_row_width(schema: &Schema, layout: &RowLayout) -> usize
{
- let mut width = layout.fixed_part_width();
- if matches!(layout.row_type, RowType::WordAligned) {
- return width;
- }
- for f in schema.fields() {
- match f.data_type() {
- DataType::Utf8 => width += UTF8_DEFAULT_SIZE,
- DataType::Binary => width += BINARY_DEFAULT_SIZE,
- _ => {}
- }
- }
- round_upto_power_of_2(width, 8)
-}
-
/// Return true of data in `schema` can be converted to raw-bytes
/// based rows.
///
/// Note all schemas can be supported in the row format
-pub fn row_supported(schema: &Schema, row_type: RowType) -> bool {
- schema
- .fields()
- .iter()
- .all(|f| supported_type(f.data_type(), row_type))
-}
-
-fn supported_type(dt: &DataType, row_type: RowType) -> bool {
- use DataType::*;
-
- match row_type {
- RowType::Compact => {
- matches!(
- dt,
- Boolean
- | UInt8
- | UInt16
- | UInt32
- | UInt64
- | Int8
- | Int16
- | Int32
- | Int64
- | Float32
- | Float64
- | Date32
- | Date64
- | Utf8
- | Binary
- )
- }
- // only fixed length types are supported for fast in-place update.
- RowType::WordAligned => {
- matches!(
- dt,
- Boolean
- | UInt8
- | UInt16
- | UInt32
- | UInt64
- | Int8
- | Int16
- | Int32
- | Int64
- | Float32
- | Float64
- | Date32
- | Date64
- | Decimal128(_, _)
- )
- }
- }
+pub fn row_supported(schema: &Schema) -> bool {
+ schema.fields().iter().all(|f| {
+ let dt = f.data_type();
+ use DataType::*;
+ matches!(
+ dt,
+ Boolean
+ | UInt8
+ | UInt16
+ | UInt32
+ | UInt64
+ | Int8
+ | Int16
+ | Int32
+ | Int64
+ | Float32
+ | Float64
+ | Date32
+ | Date64
+ | Decimal128(_, _)
+ )
+ })
}
diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs
index aab929f270..6d00bb44c8 100644
--- a/datafusion/row/src/lib.rs
+++ b/datafusion/row/src/lib.rs
@@ -36,7 +36,6 @@ use arrow::datatypes::Schema;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
pub use layout::row_supported;
-pub use layout::RowType;
use std::sync::Arc;
pub mod accessor;
@@ -102,7 +101,6 @@ fn get_columns(mut arrays: Vec<Box<dyn ArrayBuilder>>) ->
Vec<ArrayRef> {
#[cfg(test)]
mod tests {
use super::*;
- use crate::layout::RowType::{Compact, WordAligned};
use crate::reader::read_as_batch;
use crate::writer::write_batch_unchecked;
use arrow::record_batch::RecordBatch;
@@ -111,33 +109,33 @@ mod tests {
use DataType::*;
macro_rules! fn_test_single_type {
- ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
+ ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
paste::item! {
#[test]
#[allow(non_snake_case)]
- fn [<test_ $ROWTYPE _single_ $TYPE>]() -> Result<()> {
+ fn [<test _single_ $TYPE>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a",
$TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0,
schema.clone(), $ROWTYPE) };
- let output_batch = { read_as_batch(&vector, schema,
&row_offsets, $ROWTYPE)? };
+ { write_batch_unchecked(&mut vector, 0, &batch, 0,
schema.clone()) };
+ let output_batch = { read_as_batch(&vector, schema,
&row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}
#[test]
#[allow(non_snake_case)]
- fn [<test_ $ROWTYPE _single_ $TYPE _null_free>]() ->
Result<()> {
+ fn [<test_single_ $TYPE _null_free>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a",
$TYPE, false)]));
let v = $VEC.into_iter().filter(|o|
o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0,
schema.clone(), $ROWTYPE) };
- let output_batch = { read_as_batch(&vector, schema,
&row_offsets, $ROWTYPE)? };
+ { write_batch_unchecked(&mut vector, 0, &batch, 0,
schema.clone()) };
+ let output_batch = { read_as_batch(&vector, schema,
&row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}
@@ -148,229 +146,89 @@ mod tests {
fn_test_single_type!(
BooleanArray,
Boolean,
- vec![Some(true), Some(false), None, Some(true), None],
- Compact
- );
-
- fn_test_single_type!(
- BooleanArray,
- Boolean,
- vec![Some(true), Some(false), None, Some(true), None],
- WordAligned
- );
-
- fn_test_single_type!(
- Int8Array,
- Int8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(true), Some(false), None, Some(true), None]
);
fn_test_single_type!(
Int8Array,
Int8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Int16Array,
Int16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Int16Array,
- Int16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Int32Array,
Int32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Int32Array,
- Int32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Int64Array,
Int64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Int64Array,
- Int64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- UInt8Array,
- UInt8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt8Array,
UInt8,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt16Array,
UInt16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- UInt16Array,
- UInt16,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- UInt32Array,
- UInt32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt32Array,
UInt32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- UInt64Array,
- UInt64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
UInt64Array,
UInt64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Float32Array,
Float32,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- Compact
- );
-
- fn_test_single_type!(
- Float32Array,
- Float32,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- WordAligned
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
);
fn_test_single_type!(
Float64Array,
Float64,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- Compact
- );
-
- fn_test_single_type!(
- Float64Array,
- Float64,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
- WordAligned
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
);
fn_test_single_type!(
Date32Array,
Date32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
- );
-
- fn_test_single_type!(
- Date32Array,
- Date32,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- Date64Array,
- Date64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
fn_test_single_type!(
Date64Array,
Date64,
- vec![Some(5), Some(7), None, Some(0), Some(111)],
- WordAligned
- );
-
- fn_test_single_type!(
- StringArray,
- Utf8,
- vec![Some("hello"), Some("world"), None, Some(""), Some("")],
- Compact
+ vec![Some(5), Some(7), None, Some(0), Some(111)]
);
#[test]
#[should_panic(expected = "not supported yet")]
- fn test_unsupported_word_aligned_type() {
+ fn test_unsupported_type() {
let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"]));
let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
let schema = batch.schema();
let mut vector = vec![0; 1024];
- write_batch_unchecked(&mut vector, 0, &batch, 0, schema, WordAligned);
- }
-
- #[test]
- fn test_single_binary() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![Field::new("a", Binary,
true)]));
- let values: Vec<Option<&[u8]>> =
- vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
- let a = BinaryArray::from_opt_vec(values);
- let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
- let mut vector = vec![0; 8192];
- let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(),
Compact) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets,
Compact)? };
- assert_eq!(batch, output_batch);
- Ok(())
- }
-
- #[test]
- fn test_single_binary_null_free() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![Field::new("a", Binary,
false)]));
- let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"];
- let a = BinaryArray::from_vec(values);
- let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
- let mut vector = vec![0; 8192];
- let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(),
Compact) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets,
Compact)? };
- assert_eq!(batch, output_batch);
- Ok(())
+ write_batch_unchecked(&mut vector, 0, &batch, 0, schema);
}
#[test]
@@ -380,16 +238,15 @@ mod tests {
let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
let schema = batch.schema();
let mut vector = vec![0; 1024];
- write_batch_unchecked(&mut vector, 0, &batch, 0, schema, Compact);
+ write_batch_unchecked(&mut vector, 0, &batch, 0, schema);
}
#[test]
#[should_panic(expected = "not supported yet")]
fn test_unsupported_type_read() {
- let schema =
- Arc::new(Schema::new(vec![Field::new("a", Decimal128(5, 2),
false)]));
+ let schema = Arc::new(Schema::new(vec![Field::new("a", Utf8, false)]));
let vector = vec![0; 1024];
let row_offsets = vec![0];
- read_as_batch(&vector, schema, &row_offsets, Compact).unwrap();
+ read_as_batch(&vector, schema, &row_offsets).unwrap();
}
}
diff --git a/datafusion/row/src/reader.rs b/datafusion/row/src/reader.rs
index a8dc8211f0..10c9896df7 100644
--- a/datafusion/row/src/reader.rs
+++ b/datafusion/row/src/reader.rs
@@ -17,7 +17,7 @@
//! [`read_as_batch`] converts raw bytes to [`RecordBatch`]
-use crate::layout::{RowLayout, RowType};
+use crate::layout::RowLayout;
use crate::validity::{all_valid, NullBitsFormatter};
use crate::MutableRecordBatch;
use arrow::array::*;
@@ -47,11 +47,10 @@ pub fn read_as_batch(
data: &[u8],
schema: Arc<Schema>,
offsets: &[usize],
- row_type: RowType,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
- let mut row = RowReader::new(&schema, row_type);
+ let mut row = RowReader::new(&schema);
for offset in offsets.iter().take(row_num) {
row.point_to(*offset, data);
@@ -129,9 +128,9 @@ impl<'a> std::fmt::Debug for RowReader<'a> {
impl<'a> RowReader<'a> {
/// new
- pub fn new(schema: &Schema, row_type: RowType) -> Self {
+ pub fn new(schema: &Schema) -> Self {
Self {
- layout: RowLayout::new(schema, row_type),
+ layout: RowLayout::new(schema),
data: &[],
base_offset: 0,
}
@@ -217,25 +216,6 @@ impl<'a> RowReader<'a> {
get_idx!(i128, self, idx, 16)
}
- fn get_utf8(&self, idx: usize) -> &str {
- self.assert_index_valid(idx);
- let offset_size = self.get_u64(idx);
- let offset = (offset_size >> 32) as usize;
- let len = (offset_size & 0xffff_ffff) as usize;
- let varlena_offset = self.base_offset + offset;
- let bytes = &self.data[varlena_offset..varlena_offset + len];
- unsafe { std::str::from_utf8_unchecked(bytes) }
- }
-
- fn get_binary(&self, idx: usize) -> &[u8] {
- self.assert_index_valid(idx);
- let offset_size = self.get_u64(idx);
- let offset = (offset_size >> 32) as usize;
- let len = (offset_size & 0xffff_ffff) as usize;
- let varlena_offset = self.base_offset + offset;
- &self.data[varlena_offset..varlena_offset + len]
- }
-
fn_get_idx_opt!(bool);
fn_get_idx_opt!(u8);
fn_get_idx_opt!(u16);
@@ -271,14 +251,6 @@ impl<'a> RowReader<'a> {
None
}
}
-
- fn get_utf8_opt(&self, idx: usize) -> Option<&str> {
- if self.is_valid_at(idx) {
- Some(self.get_utf8(idx))
- } else {
- None
- }
- }
}
/// Read the row currently pointed by RowWriter to the output columnar batch
buffer
@@ -339,31 +311,8 @@ fn_read_field!(f32, Float32Builder);
fn_read_field!(f64, Float64Builder);
fn_read_field!(date32, Date32Builder);
fn_read_field!(date64, Date64Builder);
-fn_read_field!(utf8, StringBuilder);
fn_read_field!(decimal128, Decimal128Builder);
-pub(crate) fn read_field_binary(
- to: &mut Box<dyn ArrayBuilder>,
- col_idx: usize,
- row: &RowReader,
-) {
- let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
- if row.is_valid_at(col_idx) {
- to.append_value(row.get_binary(col_idx));
- } else {
- to.append_null();
- }
-}
-
-pub(crate) fn read_field_binary_null_free(
- to: &mut Box<dyn ArrayBuilder>,
- col_idx: usize,
- row: &RowReader,
-) {
- let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
- to.append_value(row.get_binary(col_idx));
-}
-
fn read_field(
to: &mut Box<dyn ArrayBuilder>,
dt: &DataType,
@@ -385,8 +334,6 @@ fn read_field(
Float64 => read_field_f64(to, col_idx, row),
Date32 => read_field_date32(to, col_idx, row),
Date64 => read_field_date64(to, col_idx, row),
- Utf8 => read_field_utf8(to, col_idx, row),
- Binary => read_field_binary(to, col_idx, row),
Decimal128(_, _) => read_field_decimal128(to, col_idx, row),
_ => unimplemented!(),
}
@@ -413,8 +360,6 @@ fn read_field_null_free(
Float64 => read_field_f64_null_free(to, col_idx, row),
Date32 => read_field_date32_null_free(to, col_idx, row),
Date64 => read_field_date64_null_free(to, col_idx, row),
- Utf8 => read_field_utf8_null_free(to, col_idx, row),
- Binary => read_field_binary_null_free(to, col_idx, row),
Decimal128(_, _) => read_field_decimal128_null_free(to, col_idx, row),
_ => unimplemented!(),
}
diff --git a/datafusion/row/src/writer.rs b/datafusion/row/src/writer.rs
index 7bf9ac0267..14ce6afe68 100644
--- a/datafusion/row/src/writer.rs
+++ b/datafusion/row/src/writer.rs
@@ -17,17 +17,13 @@
//! [`RowWriter`] writes [`RecordBatch`]es to `Vec<u8>` to stitch attributes
together
-use crate::layout::{estimate_row_width, RowLayout, RowType};
+use crate::layout::RowLayout;
use arrow::array::*;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
-use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw};
-use datafusion_common::cast::{
- as_binary_array, as_date32_array, as_date64_array, as_decimal128_array,
- as_string_array,
-};
+use arrow::util::bit_util::{set_bit_raw, unset_bit_raw};
+use datafusion_common::cast::{as_date32_array, as_date64_array,
as_decimal128_array};
use datafusion_common::Result;
-use std::cmp::max;
use std::sync::Arc;
/// Append batch from `row_idx` to `output` buffer start from `offset`
@@ -40,9 +36,8 @@ pub fn write_batch_unchecked(
batch: &RecordBatch,
row_idx: usize,
schema: Arc<Schema>,
- row_type: RowType,
) -> Vec<usize> {
- let mut writer = RowWriter::new(&schema, row_type);
+ let mut writer = RowWriter::new(&schema);
let mut current_offset = offset;
let mut offsets = vec![];
let columns = batch.columns();
@@ -62,9 +57,8 @@ pub fn write_batch_unchecked(
pub fn bench_write_batch(
batches: &[Vec<RecordBatch>],
schema: Arc<Schema>,
- row_type: RowType,
) -> Result<Vec<usize>> {
- let mut writer = RowWriter::new(&schema, row_type);
+ let mut writer = RowWriter::new(&schema);
let mut lengths = vec![];
for batch in batches.iter().flatten() {
@@ -125,33 +119,24 @@ pub struct RowWriter {
data: Vec<u8>,
/// Length in bytes for the current tuple, 8-bytes word aligned.
pub(crate) row_width: usize,
- /// Length in bytes for `variable length data` part of the current tuple.
- varlena_width: usize,
- /// Current offset for the next variable length field to write to.
- varlena_offset: usize,
}
impl RowWriter {
/// New
- pub fn new(schema: &Schema, row_type: RowType) -> Self {
- let layout = RowLayout::new(schema, row_type);
- let init_capacity = estimate_row_width(schema, &layout);
- let varlena_offset = layout.fixed_part_width();
+ pub fn new(schema: &Schema) -> Self {
+ let layout = RowLayout::new(schema);
+ let init_capacity = layout.fixed_part_width();
Self {
layout,
data: vec![0; init_capacity],
- row_width: 0,
- varlena_width: 0,
- varlena_offset,
+ row_width: init_capacity,
}
}
/// Reset the row writer state for new tuple
pub fn reset(&mut self) {
self.data.fill(0);
- self.row_width = 0;
- self.varlena_width = 0;
- self.varlena_offset = self.layout.fixed_part_width();
+ self.row_width = self.layout.fixed_part_width();
}
#[inline]
@@ -230,45 +215,6 @@ impl RowWriter {
set_idx!(16, self, idx, value)
}
- fn set_offset_size(&mut self, idx: usize, size: u32) {
- let offset_and_size: u64 = (self.varlena_offset as u64) << 32 | (size
as u64);
- self.set_u64(idx, offset_and_size);
- }
-
- fn set_utf8(&mut self, idx: usize, value: &str) {
- self.assert_index_valid(idx);
- let bytes = value.as_bytes();
- let size = bytes.len();
- self.set_offset_size(idx, size as u32);
- let varlena_offset = self.varlena_offset;
- self.data[varlena_offset..varlena_offset +
size].copy_from_slice(bytes);
- self.varlena_offset += size;
- self.varlena_width += size;
- }
-
- fn set_binary(&mut self, idx: usize, value: &[u8]) {
- self.assert_index_valid(idx);
- let size = value.len();
- self.set_offset_size(idx, size as u32);
- let varlena_offset = self.varlena_offset;
- self.data[varlena_offset..varlena_offset +
size].copy_from_slice(value);
- self.varlena_offset += size;
- self.varlena_width += size;
- }
-
- fn current_width(&self) -> usize {
- self.layout.fixed_part_width() + self.varlena_width
- }
-
- /// End each row at 8-byte word boundary.
- pub(crate) fn end_padding(&mut self) {
- let payload_width = self.current_width();
- self.row_width = round_upto_power_of_2(payload_width, 8);
- if self.data.len() < self.row_width {
- self.data.resize(self.row_width, 0);
- }
- }
-
/// Get raw bytes
pub fn get_row(&self) -> &[u8] {
&self.data[0..self.row_width]
@@ -298,7 +244,6 @@ pub fn write_row(
}
}
- row_writer.end_padding();
row_writer.row_width
}
@@ -350,36 +295,6 @@ pub(crate) fn write_field_date64(
to.set_date64(col_idx, from.value(row_idx));
}
-pub(crate) fn write_field_utf8(
- to: &mut RowWriter,
- from: &Arc<dyn Array>,
- col_idx: usize,
- row_idx: usize,
-) {
- let from = as_string_array(from).unwrap();
- let s = from.value(row_idx);
- let new_width = to.current_width() + s.as_bytes().len();
- if new_width > to.data.len() {
- to.data.resize(max(to.data.capacity(), new_width), 0);
- }
- to.set_utf8(col_idx, s);
-}
-
-pub(crate) fn write_field_binary(
- to: &mut RowWriter,
- from: &Arc<dyn Array>,
- col_idx: usize,
- row_idx: usize,
-) {
- let from = as_binary_array(from).unwrap();
- let s = from.value(row_idx);
- let new_width = to.current_width() + s.len();
- if new_width > to.data.len() {
- to.data.resize(max(to.data.capacity(), new_width), 0);
- }
- to.set_binary(col_idx, s);
-}
-
pub(crate) fn write_field_decimal128(
to: &mut RowWriter,
from: &Arc<dyn Array>,
@@ -412,8 +327,6 @@ fn write_field(
Float64 => write_field_f64(row, col, col_idx, row_idx),
Date32 => write_field_date32(row, col, col_idx, row_idx),
Date64 => write_field_date64(row, col, col_idx, row_idx),
- Utf8 => write_field_utf8(row, col, col_idx, row_idx),
- Binary => write_field_binary(row, col, col_idx, row_idx),
Decimal128(_, _) => write_field_decimal128(row, col, col_idx, row_idx),
_ => unimplemented!(),
}