This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5b1821e05 feat + fix: IPC support for run encoded array. (#3662)
5b1821e05 is described below
commit 5b1821e0564f586f6f98e5c392a0a208890055df
Author: askoa <[email protected]>
AuthorDate: Fri Feb 10 09:39:47 2023 -0500
feat + fix: IPC support for run encoded array. (#3662)
* Schema.fbs changes, flatbuffer generated code, flatbuffer gen script
changes
Add ipc reader, writer and equals
Add/Update tests
* Add support for non zero offset in run array
* clippy fixes
* format fix
* doc fix
* incorporate pr comments
* fix formatting
* more pr comments
* pr suggestions
---------
Co-authored-by: ask <ask@local>
Co-authored-by: devx <devx@local>
---
arrow-array/src/array/run_array.rs | 170 ++++++++++++++++++++++-----
arrow-data/src/data.rs | 13 ++-
arrow-data/src/equal/mod.rs | 5 +-
arrow-data/src/equal/run.rs | 84 ++++++++++++++
arrow-ipc/regen.sh | 32 ++----
arrow-ipc/src/convert.rs | 25 +++-
arrow-ipc/src/gen/Schema.rs | 120 ++++++++++++++++++-
arrow-ipc/src/gen/SparseTensor.rs | 25 ++++
arrow-ipc/src/gen/Tensor.rs | 25 ++++
arrow-ipc/src/reader.rs | 126 ++++++++++++++++++--
arrow-ipc/src/writer.rs | 229 ++++++++++++++++++++++++++++++++++---
arrow-select/src/take.rs | 1 -
format/Schema.fbs | 14 ++-
13 files changed, 779 insertions(+), 90 deletions(-)
diff --git a/arrow-array/src/array/run_array.rs
b/arrow-array/src/array/run_array.rs
index 2e378c90f..33738d649 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -67,9 +67,9 @@ pub struct RunArray<R: RunEndIndexType> {
}
impl<R: RunEndIndexType> RunArray<R> {
- // calculates the logical length of the array encoded
- // by the given run_ends array.
- fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
+ /// Calculates the logical length of the array encoded
+ /// by the given run_ends array.
+ pub fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
let len = run_ends.len();
if len == 0 {
return 0;
@@ -145,14 +145,15 @@ impl<R: RunEndIndexType> RunArray<R> {
}
/// Returns index to the physical array for the given index to the logical
array.
+ /// The function does not adjust the input logical index based on
`ArrayData::offset`.
/// Performs a binary search on the run_ends array for the input index.
#[inline]
- pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
- if logical_index >= self.len() {
+ pub fn get_zero_offset_physical_index(&self, logical_index: usize) ->
Option<usize> {
+ if logical_index >= Self::logical_len(&self.run_ends) {
return None;
}
let mut st: usize = 0;
- let mut en: usize = self.run_ends().len();
+ let mut en: usize = self.run_ends.len();
while st + 1 < en {
let mid: usize = (st + en) / 2;
if logical_index
@@ -164,7 +165,7 @@ impl<R: RunEndIndexType> RunArray<R> {
// `en` starts with len. The condition `st + 1 < en`
ensures
// `st` and `en` differs atleast by two. So the value of
`mid`
// will never be either `st` or `en`
- self.run_ends().value_unchecked(mid - 1).as_usize()
+ self.run_ends.value_unchecked(mid - 1).as_usize()
}
{
en = mid
@@ -175,6 +176,17 @@ impl<R: RunEndIndexType> RunArray<R> {
Some(st)
}
+ /// Returns index to the physical array for the given index to the logical
array.
+ /// This function adjusts the input logical index based on
`ArrayData::offset`
+ /// Performs a binary search on the run_ends array for the input index.
+ #[inline]
+ pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
+ if logical_index >= self.len() {
+ return None;
+ }
+ self.get_zero_offset_physical_index(logical_index + self.offset())
+ }
+
/// Returns the physical indices of the input logical indices. Returns
error if any of the logical
/// index cannot be converted to physical index. The logical indices are
sorted and iterated along
/// with run_ends array to find matching physical index. The approach used
here was chosen over
@@ -192,6 +204,10 @@ impl<R: RunEndIndexType> RunArray<R> {
{
let indices_len = logical_indices.len();
+ if indices_len == 0 {
+ return Ok(vec![]);
+ }
+
// `ordered_indices` store index into `logical_indices` and can be used
// to iterate `logical_indices` in sorted order.
let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
@@ -204,12 +220,30 @@ impl<R: RunEndIndexType> RunArray<R> {
.unwrap()
});
+ // Return early if all the logical indices cannot be converted to
physical indices.
+ let largest_logical_index =
+ logical_indices[*ordered_indices.last().unwrap()].as_usize();
+ if largest_logical_index >= self.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Cannot convert all logical indices to physical indices. The
logical index cannot be converted is {largest_logical_index}.",
+ )));
+ }
+
+ // Skip some physical indices based on offset.
+ let skip_value = if self.offset() > 0 {
+ self.get_zero_offset_physical_index(self.offset()).unwrap()
+ } else {
+ 0
+ };
+
let mut physical_indices = vec![0; indices_len];
let mut ordered_index = 0_usize;
- for (physical_index, run_end) in
self.run_ends.values().iter().enumerate() {
- // Get the run end index of current physical index
- let run_end_value = run_end.as_usize();
+ for (physical_index, run_end) in
+ self.run_ends.values().iter().enumerate().skip(skip_value)
+ {
+ // Get the run end index (relative to offset) of current physical
index
+ let run_end_value = run_end.as_usize() - self.offset();
// All the `logical_indices` that are less than current run end
index
// belongs to current physical index.
@@ -552,6 +586,34 @@ mod tests {
result
}
+ // Asserts that `logical_array[logical_indices[*]] ==
physical_array[physical_indices[*]]`
+ fn compare_logical_and_physical_indices(
+ logical_indices: &[u32],
+ logical_array: &[Option<i32>],
+ physical_indices: &[usize],
+ physical_array: &PrimitiveArray<Int32Type>,
+ ) {
+ assert_eq!(logical_indices.len(), physical_indices.len());
+
+ // check value in logical index in the logical_array matches physical
index in physical_array
+ logical_indices
+ .iter()
+ .map(|f| f.as_usize())
+ .zip(physical_indices.iter())
+ .for_each(|(logical_ix, physical_ix)| {
+ let expected = logical_array[logical_ix];
+ match expected {
+ Some(val) => {
+ assert!(physical_array.is_valid(*physical_ix));
+ let actual = physical_array.value(*physical_ix);
+ assert_eq!(val, actual);
+ }
+ None => {
+ assert!(physical_array.is_null(*physical_ix))
+ }
+ };
+ });
+ }
#[test]
fn test_run_array() {
// Construct a value array
@@ -824,23 +886,77 @@ mod tests {
assert_eq!(logical_indices.len(), physical_indices.len());
// check value in logical index in the input_array matches
physical index in typed_run_array
- logical_indices
- .iter()
- .map(|f| f.as_usize())
- .zip(physical_indices.iter())
- .for_each(|(logical_ix, physical_ix)| {
- let expected = input_array[logical_ix];
- match expected {
- Some(val) => {
-
assert!(physical_values_array.is_valid(*physical_ix));
- let actual =
physical_values_array.value(*physical_ix);
- assert_eq!(val, actual);
- }
- None => {
-
assert!(physical_values_array.is_null(*physical_ix))
- }
- };
- });
+ compare_logical_and_physical_indices(
+ &logical_indices,
+ &input_array,
+ &physical_indices,
+ physical_values_array,
+ );
+ }
+ }
+
+ #[test]
+ fn test_get_physical_indices_sliced() {
+ let total_len = 80;
+ let input_array = build_input_array(total_len);
+
+ // Encode the input_array to run array
+ let mut builder =
+ PrimitiveRunBuilder::<Int16Type,
Int32Type>::with_capacity(input_array.len());
+ builder.extend(input_array.iter().copied());
+ let run_array = builder.finish();
+ let physical_values_array =
as_primitive_array::<Int32Type>(run_array.values());
+
+ // test for all slice lengths.
+ for slice_len in 1..=total_len {
+ // create an array consisting of all the indices repeated twice
and shuffled.
+ let mut logical_indices: Vec<u32> = (0_u32..(slice_len as
u32)).collect();
+ // add same indices once more
+ logical_indices.append(&mut logical_indices.clone());
+ let mut rng = thread_rng();
+ logical_indices.shuffle(&mut rng);
+
+ // test for offset = 0 and slice length = slice_len
+ // slice the input array using which the run array was built.
+ let sliced_input_array = &input_array[0..slice_len];
+
+ // slice the run array
+ let sliced_run_array: RunArray<Int16Type> =
+ run_array.slice(0, slice_len).into_data().into();
+
+ // Get physical indices.
+ let physical_indices = sliced_run_array
+ .get_physical_indices(&logical_indices)
+ .unwrap();
+
+ compare_logical_and_physical_indices(
+ &logical_indices,
+ sliced_input_array,
+ &physical_indices,
+ physical_values_array,
+ );
+
+ // test for offset = total_len - slice_len and slice length =
slice_len
+ // slice the input array using which the run array was built.
+ let sliced_input_array = &input_array[total_len -
slice_len..total_len];
+
+ // slice the run array
+ let sliced_run_array: RunArray<Int16Type> = run_array
+ .slice(total_len - slice_len, slice_len)
+ .into_data()
+ .into();
+
+ // Get physical indices
+ let physical_indices = sliced_run_array
+ .get_physical_indices(&logical_indices)
+ .unwrap();
+
+ compare_logical_and_physical_indices(
+ &logical_indices,
+ sliced_input_array,
+ &physical_indices,
+ physical_values_array,
+ );
}
}
}
diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs
index 8b727ec95..8742f8db9 100644
--- a/arrow-data/src/data.rs
+++ b/arrow-data/src/data.rs
@@ -1290,9 +1290,9 @@ impl ArrayData {
DataType::RunEndEncoded(run_ends, _values) => {
let run_ends_data = self.child_data()[0].clone();
match run_ends.data_type() {
- DataType::Int16 =>
run_ends_data.check_run_ends::<i16>(self.len()),
- DataType::Int32 =>
run_ends_data.check_run_ends::<i32>(self.len()),
- DataType::Int64 =>
run_ends_data.check_run_ends::<i64>(self.len()),
+ DataType::Int16 => run_ends_data.check_run_ends::<i16>(),
+ DataType::Int32 => run_ends_data.check_run_ends::<i32>(),
+ DataType::Int64 => run_ends_data.check_run_ends::<i64>(),
_ => unreachable!(),
}
}
@@ -1451,7 +1451,7 @@ impl ArrayData {
}
/// Validates that each value in run_ends array is positive and strictly
increasing.
- fn check_run_ends<T>(&self, array_len: usize) -> Result<(), ArrowError>
+ fn check_run_ends<T>(&self) -> Result<(), ArrowError>
where
T: ArrowNativeType + TryInto<i64> + num::Num + std::fmt::Display,
{
@@ -1478,9 +1478,10 @@ impl ArrayData {
Ok(())
})?;
- if prev_value.as_usize() != array_len {
+ if prev_value.as_usize() < (self.offset + self.len) {
return Err(ArrowError::InvalidArgumentError(format!(
- "The length of array does not match the last value in the
run_ends array. The last value of run_ends array is {prev_value} and length of
array is {array_len}."
+ "The offset + length of array should be less or equal to last
value in the run_ends array. The last value of run_ends array is {prev_value}
and offset + length of array is {}.",
+ self.offset + self.len
)));
}
Ok(())
diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs
index aff61e3d3..871a312ca 100644
--- a/arrow-data/src/equal/mod.rs
+++ b/arrow-data/src/equal/mod.rs
@@ -31,6 +31,7 @@ mod fixed_list;
mod list;
mod null;
mod primitive;
+mod run;
mod structure;
mod union;
mod utils;
@@ -50,6 +51,8 @@ use structure::struct_equal;
use union::union_equal;
use variable_size::variable_sized_equal;
+use self::run::run_equal;
+
/// Compares the values of two [ArrayData] starting at `lhs_start` and
`rhs_start` respectively
/// for `len` slots.
#[inline]
@@ -137,7 +140,7 @@ fn equal_values(
},
DataType::Float16 => primitive_equal::<f16>(lhs, rhs, lhs_start,
rhs_start, len),
DataType::Map(_, _) => list_equal::<i32>(lhs, rhs, lhs_start,
rhs_start, len),
- DataType::RunEndEncoded(_, _) => todo!(),
+ DataType::RunEndEncoded(_, _) => run_equal(lhs, rhs, lhs_start,
rhs_start, len),
}
}
diff --git a/arrow-data/src/equal/run.rs b/arrow-data/src/equal/run.rs
new file mode 100644
index 000000000..ede172c99
--- /dev/null
+++ b/arrow-data/src/equal/run.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::data::ArrayData;
+
+use super::equal_range;
+
+/// The current implementation of comparison of run array support physical
comparison.
+/// Comparing run encoded array based on logical indices (`lhs_start`,
`rhs_start`) will
+/// be time consuming as converting from logical index to physical index
cannot be done
+/// in constant time. The current comparison compares the underlying physical
arrays.
+pub(super) fn run_equal(
+ lhs: &ArrayData,
+ rhs: &ArrayData,
+ lhs_start: usize,
+ rhs_start: usize,
+ len: usize,
+) -> bool {
+ if lhs_start != 0
+ || rhs_start != 0
+ || (lhs.len() != len && rhs.len() != len)
+ || lhs.offset() > 0
+ || rhs.offset() > 0
+ {
+ unimplemented!("Logical comparison for run array not supported.")
+ }
+
+ if lhs.len() != rhs.len() {
+ return false;
+ }
+
+ let lhs_run_ends_array = lhs.child_data().get(0).unwrap();
+ let lhs_values_array = lhs.child_data().get(1).unwrap();
+
+ let rhs_run_ends_array = rhs.child_data().get(0).unwrap();
+ let rhs_values_array = rhs.child_data().get(1).unwrap();
+
+ if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
+ return false;
+ }
+
+ if lhs_values_array.len() != rhs_values_array.len() {
+ return false;
+ }
+
+ // check run ends array are equal. The length of the physical array
+ // is used to validate the child arrays.
+ let run_ends_equal = equal_range(
+ lhs_run_ends_array,
+ rhs_run_ends_array,
+ lhs_start,
+ rhs_start,
+ lhs_run_ends_array.len(),
+ );
+
+ // if run ends array are not the same return early without validating
+ // values array.
+ if !run_ends_equal {
+ return false;
+ }
+
+ // check values array are equal
+ equal_range(
+ lhs_values_array,
+ rhs_values_array,
+ lhs_start,
+ rhs_start,
+ rhs_values_array.len(),
+ )
+}
diff --git a/arrow-ipc/regen.sh b/arrow-ipc/regen.sh
index 9d384b6b6..8d8862ccc 100755
--- a/arrow-ipc/regen.sh
+++ b/arrow-ipc/regen.sh
@@ -18,15 +18,13 @@
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-# Change to the toplevel Rust directory
-pushd $DIR/../../
+# Change to the toplevel `arrow-rs` directory
+pushd $DIR/../
echo "Build flatc from source ..."
FB_URL="https://github.com/google/flatbuffers"
-# https://github.com/google/flatbuffers/pull/6393
-FB_COMMIT="408cf5802415e1dea65fef7489a6c2f3740fb381"
-FB_DIR="rust/arrow/.flatbuffers"
+FB_DIR="arrow/.flatbuffers"
FLATC="$FB_DIR/bazel-bin/flatc"
if [ -z $(which bazel) ]; then
@@ -44,28 +42,21 @@ else
git -C $FB_DIR pull
fi
-echo "hard reset to $FB_COMMIT"
-git -C $FB_DIR reset --hard $FB_COMMIT
-
pushd $FB_DIR
echo "run: bazel build :flatc ..."
bazel build :flatc
popd
-FB_PATCH="rust/arrow/format-0ed34c83.patch"
-echo "Patch flatbuffer files with ${FB_PATCH} for cargo doc"
-echo "NOTE: the patch MAY need update in case of changes in format/*.fbs"
-git apply --check ${FB_PATCH} && git apply ${FB_PATCH}
# Execute the code generation:
-$FLATC --filename-suffix "" --rust -o rust/arrow/src/ipc/gen/ format/*.fbs
+$FLATC --filename-suffix "" --rust -o arrow-ipc/src/gen/ format/*.fbs
# Reset changes to format/
git checkout -- format
# Now the files are wrongly named so we have to change that.
popd
-pushd $DIR/src/ipc/gen
+pushd $DIR/src/gen
PREFIX=$(cat <<'HEREDOC'
// Licensed to the Apache Software Foundation (ASF) under one
@@ -94,9 +85,9 @@ use flatbuffers::EndianScalar;
HEREDOC
)
-SCHEMA_IMPORT="\nuse crate::ipc::gen::Schema::*;"
-SPARSE_TENSOR_IMPORT="\nuse crate::ipc::gen::SparseTensor::*;"
-TENSOR_IMPORT="\nuse crate::ipc::gen::Tensor::*;"
+SCHEMA_IMPORT="\nuse crate::gen::Schema::*;"
+SPARSE_TENSOR_IMPORT="\nuse crate::gen::SparseTensor::*;"
+TENSOR_IMPORT="\nuse crate::gen::Tensor::*;"
# For flatbuffer(1.12.0+), remove: use crate::${name}::\*;
names=("File" "Message" "Schema" "SparseTensor" "Tensor")
@@ -119,8 +110,9 @@ for f in `ls *.rs`; do
sed -i '' '/} \/\/ pub mod arrow/d' $f
sed -i '' '/} \/\/ pub mod apache/d' $f
sed -i '' '/} \/\/ pub mod org/d' $f
- sed -i '' '/use std::mem;/d' $f
- sed -i '' '/use std::cmp::Ordering;/d' $f
+ sed -i '' '/use core::mem;/d' $f
+ sed -i '' '/use core::cmp::Ordering;/d' $f
+ sed -i '' '/use self::flatbuffers::{EndianScalar, Follow};/d' $f
# required by flatc 1.12.0+
sed -i '' "/\#\!\[allow(unused_imports, dead_code)\]/d" $f
@@ -150,7 +142,7 @@ done
# Return back to base directory
popd
-cargo +stable fmt -- src/ipc/gen/*
+cargo +stable fmt -- src/gen/*
echo "DONE!"
echo "Please run 'cargo doc' and 'cargo test' with nightly and stable, "
diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index c5681b0c8..aede8a448 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -364,6 +364,18 @@ pub(crate) fn get_data_type(field: crate::Field,
may_be_dictionary: bool) -> Dat
DataType::Struct(fields)
}
+ crate::Type::RunEndEncoded => {
+ let children = field.children().unwrap();
+ if children.len() != 2 {
+ panic!(
+ "RunEndEncoded type should have exactly two children.
Found {}",
+ children.len()
+ )
+ }
+ let run_ends_field = children.get(0).into();
+ let values_field = children.get(1).into();
+ DataType::RunEndEncoded(Box::new(run_ends_field),
Box::new(values_field))
+ }
crate::Type::Map => {
let map = field.type_as_map().unwrap();
let children = field.children().unwrap();
@@ -710,7 +722,18 @@ pub(crate) fn get_fb_field_type<'a>(
children: Some(fbb.create_vector(&children[..])),
}
}
- RunEndEncoded(_, _) => todo!(),
+ RunEndEncoded(run_ends, values) => {
+ let run_ends_field = build_field(fbb, run_ends);
+ let values_field = build_field(fbb, values);
+ let children = vec![run_ends_field, values_field];
+ FBFieldType {
+ type_type: crate::Type::RunEndEncoded,
+ type_: crate::RunEndEncodedBuilder::new(fbb)
+ .finish()
+ .as_union_value(),
+ children: Some(fbb.create_vector(&children[..])),
+ }
+ }
Map(map_field, keys_sorted) => {
let child = build_field(fbb, map_field);
let mut field_type = crate::MapBuilder::new(fbb);
diff --git a/arrow-ipc/src/gen/Schema.rs b/arrow-ipc/src/gen/Schema.rs
index 6479bece7..cf3ea0bd4 100644
--- a/arrow-ipc/src/gen/Schema.rs
+++ b/arrow-ipc/src/gen/Schema.rs
@@ -735,13 +735,13 @@ pub const ENUM_MIN_TYPE: u8 = 0;
since = "2.0.0",
note = "Use associated constants instead. This will no longer be generated
in 2021."
)]
-pub const ENUM_MAX_TYPE: u8 = 21;
+pub const ENUM_MAX_TYPE: u8 = 22;
#[deprecated(
since = "2.0.0",
note = "Use associated constants instead. This will no longer be generated
in 2021."
)]
#[allow(non_camel_case_types)]
-pub const ENUM_VALUES_TYPE: [Type; 22] = [
+pub const ENUM_VALUES_TYPE: [Type; 23] = [
Type::NONE,
Type::Null,
Type::Int,
@@ -764,6 +764,7 @@ pub const ENUM_VALUES_TYPE: [Type; 22] = [
Type::LargeBinary,
Type::LargeUtf8,
Type::LargeList,
+ Type::RunEndEncoded,
];
/// ----------------------------------------------------------------------
@@ -796,9 +797,10 @@ impl Type {
pub const LargeBinary: Self = Self(19);
pub const LargeUtf8: Self = Self(20);
pub const LargeList: Self = Self(21);
+ pub const RunEndEncoded: Self = Self(22);
pub const ENUM_MIN: u8 = 0;
- pub const ENUM_MAX: u8 = 21;
+ pub const ENUM_MAX: u8 = 22;
pub const ENUM_VALUES: &'static [Self] = &[
Self::NONE,
Self::Null,
@@ -822,6 +824,7 @@ impl Type {
Self::LargeBinary,
Self::LargeUtf8,
Self::LargeList,
+ Self::RunEndEncoded,
];
/// Returns the variant's name or "" if unknown.
pub fn variant_name(self) -> Option<&'static str> {
@@ -848,6 +851,7 @@ impl Type {
Self::LargeBinary => Some("LargeBinary"),
Self::LargeUtf8 => Some("LargeUtf8"),
Self::LargeList => Some("LargeList"),
+ Self::RunEndEncoded => Some("RunEndEncoded"),
_ => None,
}
}
@@ -2646,6 +2650,90 @@ impl core::fmt::Debug for Bool<'_> {
ds.finish()
}
}
+pub enum RunEndEncodedOffset {}
+#[derive(Copy, Clone, PartialEq)]
+
+/// Contains two child arrays, run_ends and values.
+/// The run_ends child array must be a 16/32/64-bit integer array
+/// which encodes the indices at which the run with the value in
+/// each corresponding index in the values child array ends.
+/// Like list/struct types, the value array can be of any type.
+pub struct RunEndEncoded<'a> {
+ pub _tab: flatbuffers::Table<'a>,
+}
+
+impl<'a> flatbuffers::Follow<'a> for RunEndEncoded<'a> {
+ type Inner = RunEndEncoded<'a>;
+ #[inline]
+ unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
+ Self {
+ _tab: flatbuffers::Table::new(buf, loc),
+ }
+ }
+}
+
+impl<'a> RunEndEncoded<'a> {
+ #[inline]
+ pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
+ RunEndEncoded { _tab: table }
+ }
+ #[allow(unused_mut)]
+ pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>(
+ _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>,
+ _args: &'args RunEndEncodedArgs,
+ ) -> flatbuffers::WIPOffset<RunEndEncoded<'bldr>> {
+ let mut builder = RunEndEncodedBuilder::new(_fbb);
+ builder.finish()
+ }
+}
+
+impl flatbuffers::Verifiable for RunEndEncoded<'_> {
+ #[inline]
+ fn run_verifier(
+ v: &mut flatbuffers::Verifier,
+ pos: usize,
+ ) -> Result<(), flatbuffers::InvalidFlatbuffer> {
+ use flatbuffers::Verifiable;
+ v.visit_table(pos)?.finish();
+ Ok(())
+ }
+}
+pub struct RunEndEncodedArgs {}
+impl<'a> Default for RunEndEncodedArgs {
+ #[inline]
+ fn default() -> Self {
+ RunEndEncodedArgs {}
+ }
+}
+
+pub struct RunEndEncodedBuilder<'a: 'b, 'b> {
+ fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>,
+ start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
+}
+impl<'a: 'b, 'b> RunEndEncodedBuilder<'a, 'b> {
+ #[inline]
+ pub fn new(
+ _fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
+ ) -> RunEndEncodedBuilder<'a, 'b> {
+ let start = _fbb.start_table();
+ RunEndEncodedBuilder {
+ fbb_: _fbb,
+ start_: start,
+ }
+ }
+ #[inline]
+ pub fn finish(self) -> flatbuffers::WIPOffset<RunEndEncoded<'a>> {
+ let o = self.fbb_.end_table(self.start_);
+ flatbuffers::WIPOffset::new(o.value())
+ }
+}
+
+impl core::fmt::Debug for RunEndEncoded<'_> {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ let mut ds = f.debug_struct("RunEndEncoded");
+ ds.finish()
+ }
+}
pub enum DecimalOffset {}
#[derive(Copy, Clone, PartialEq)]
@@ -4316,6 +4404,21 @@ impl<'a> Field<'a> {
None
}
}
+
+ #[inline]
+ #[allow(non_snake_case)]
+ pub fn type_as_run_end_encoded(&self) -> Option<RunEndEncoded<'a>> {
+ if self.type_type() == Type::RunEndEncoded {
+ self.type_().map(|t| {
+ // Safety:
+ // Created from a valid Table for this object
+ // Which contains a valid union in this slot
+ unsafe { RunEndEncoded::init_from_table(t) }
+ })
+ } else {
+ None
+ }
+ }
}
impl flatbuffers::Verifiable for Field<'_> {
@@ -4351,6 +4454,7 @@ impl flatbuffers::Verifiable for Field<'_> {
Type::LargeBinary =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeBinary>>("Type::LargeBinary",
pos),
Type::LargeUtf8 =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeUtf8>>("Type::LargeUtf8",
pos),
Type::LargeList =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeList>>("Type::LargeList",
pos),
+ Type::RunEndEncoded =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<RunEndEncoded>>("Type::RunEndEncoded",
pos),
_ => Ok(()),
}
})?
@@ -4686,6 +4790,16 @@ impl core::fmt::Debug for Field<'_> {
)
}
}
+ Type::RunEndEncoded => {
+ if let Some(x) = self.type_as_run_end_encoded() {
+ ds.field("type_", &x)
+ } else {
+ ds.field(
+ "type_",
+ &"InvalidFlatbuffer: Union discriminant does not match
value.",
+ )
+ }
+ }
_ => {
let x: Option<()> = None;
ds.field("type_", &x)
diff --git a/arrow-ipc/src/gen/SparseTensor.rs
b/arrow-ipc/src/gen/SparseTensor.rs
index c5e06c30e..83fed4873 100644
--- a/arrow-ipc/src/gen/SparseTensor.rs
+++ b/arrow-ipc/src/gen/SparseTensor.rs
@@ -1524,6 +1524,20 @@ impl<'a> SparseTensor<'a> {
}
}
+ #[inline]
+ #[allow(non_snake_case)]
+ pub fn type_as_run_end_encoded(&self) -> Option<RunEndEncoded<'a>> {
+ if self.type_type() == Type::RunEndEncoded {
+ let u = self.type_();
+ // Safety:
+ // Created from a valid Table for this object
+ // Which contains a valid union in this slot
+ Some(unsafe { RunEndEncoded::init_from_table(u) })
+ } else {
+ None
+ }
+ }
+
#[inline]
#[allow(non_snake_case)]
pub fn sparseIndex_as_sparse_tensor_index_coo(
@@ -1604,6 +1618,7 @@ impl flatbuffers::Verifiable for SparseTensor<'_> {
Type::LargeBinary =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeBinary>>("Type::LargeBinary",
pos),
Type::LargeUtf8 =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeUtf8>>("Type::LargeUtf8",
pos),
Type::LargeList =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeList>>("Type::LargeList",
pos),
+ Type::RunEndEncoded =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<RunEndEncoded>>("Type::RunEndEncoded",
pos),
_ => Ok(()),
}
})?
@@ -1943,6 +1958,16 @@ impl core::fmt::Debug for SparseTensor<'_> {
)
}
}
+ Type::RunEndEncoded => {
+ if let Some(x) = self.type_as_run_end_encoded() {
+ ds.field("type_", &x)
+ } else {
+ ds.field(
+ "type_",
+ &"InvalidFlatbuffer: Union discriminant does not match
value.",
+ )
+ }
+ }
_ => {
let x: Option<()> = None;
ds.field("type_", &x)
diff --git a/arrow-ipc/src/gen/Tensor.rs b/arrow-ipc/src/gen/Tensor.rs
index 954ebd290..43133fec0 100644
--- a/arrow-ipc/src/gen/Tensor.rs
+++ b/arrow-ipc/src/gen/Tensor.rs
@@ -565,6 +565,20 @@ impl<'a> Tensor<'a> {
None
}
}
+
+ #[inline]
+ #[allow(non_snake_case)]
+ pub fn type_as_run_end_encoded(&self) -> Option<RunEndEncoded<'a>> {
+ if self.type_type() == Type::RunEndEncoded {
+ let u = self.type_();
+ // Safety:
+ // Created from a valid Table for this object
+ // Which contains a valid union in this slot
+ Some(unsafe { RunEndEncoded::init_from_table(u) })
+ } else {
+ None
+ }
+ }
}
impl flatbuffers::Verifiable for Tensor<'_> {
@@ -598,6 +612,7 @@ impl flatbuffers::Verifiable for Tensor<'_> {
Type::LargeBinary =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeBinary>>("Type::LargeBinary",
pos),
Type::LargeUtf8 =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeUtf8>>("Type::LargeUtf8",
pos),
Type::LargeList =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<LargeList>>("Type::LargeList",
pos),
+ Type::RunEndEncoded =>
v.verify_union_variant::<flatbuffers::ForwardsUOffset<RunEndEncoded>>("Type::RunEndEncoded",
pos),
_ => Ok(()),
}
})?
@@ -907,6 +922,16 @@ impl core::fmt::Debug for Tensor<'_> {
)
}
}
+ Type::RunEndEncoded => {
+ if let Some(x) = self.type_as_run_end_encoded() {
+ ds.field("type_", &x)
+ } else {
+ ds.field(
+ "type_",
+ &"InvalidFlatbuffer: Union discriminant does not match
value.",
+ )
+ }
+ }
_ => {
let x: Option<()> = None;
ds.field("type_", &x)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 17f521e42..6842474fb 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -194,6 +194,50 @@ fn create_array(
};
Arc::new(struct_array)
}
+ RunEndEncoded(run_ends_field, values_field) => {
+ let run_node = nodes.get(node_index);
+ node_index += 1;
+
+ let run_ends_triple = create_array(
+ nodes,
+ run_ends_field,
+ data,
+ buffers,
+ dictionaries_by_id,
+ node_index,
+ buffer_index,
+ compression_codec,
+ metadata,
+ )?;
+ node_index = run_ends_triple.1;
+ buffer_index = run_ends_triple.2;
+
+ let values_triple = create_array(
+ nodes,
+ values_field,
+ data,
+ buffers,
+ dictionaries_by_id,
+ node_index,
+ buffer_index,
+ compression_codec,
+ metadata,
+ )?;
+ node_index = values_triple.1;
+ buffer_index = values_triple.2;
+
+ let run_array_length = run_node.length() as usize;
+ let run_array_null_count = run_node.null_count() as usize;
+ let data = ArrayData::builder(data_type.clone())
+ .len(run_array_length)
+ .null_count(run_array_null_count)
+ .offset(0)
+ .add_child_data(run_ends_triple.0.into_data())
+ .add_child_data(values_triple.0.into_data())
+ .build()?;
+
+ make_array(data)
+ }
// Create dictionary array from RecordBatch
Dictionary(_, _) => {
let index_node = nodes.get(node_index);
@@ -361,6 +405,17 @@ fn skip_field(
buffer_index = tuple.1;
}
}
+ RunEndEncoded(run_ends_field, values_field) => {
+ node_index += 1;
+
+ let tuple = skip_field(run_ends_field.data_type(), node_index,
buffer_index)?;
+ node_index = tuple.0;
+ buffer_index = tuple.1;
+
+ let tuple = skip_field(values_field.data_type(), node_index,
buffer_index)?;
+ node_index = tuple.0;
+ buffer_index = tuple.1;
+ }
Dictionary(_, _) => {
node_index += 1;
buffer_index += 2;
@@ -1189,9 +1244,11 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
#[cfg(test)]
mod tests {
+ use crate::writer::unslice_run_array;
+
use super::*;
- use arrow_array::builder::UnionBuilder;
+ use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
use arrow_array::types::*;
use arrow_buffer::ArrowNativeType;
use arrow_data::ArrayDataBuilder;
@@ -1227,6 +1284,11 @@ mod tests {
];
let struct_data_type = DataType::Struct(struct_fields);
+ let run_encoded_data_type = DataType::RunEndEncoded(
+ Box::new(Field::new("run_ends", DataType::Int16, false)),
+ Box::new(Field::new("values", DataType::Int32, true)),
+ );
+
// define schema
Schema::new(vec![
Field::new("f0", DataType::UInt32, false),
@@ -1239,9 +1301,10 @@ mod tests {
Field::new("f7", DataType::FixedSizeBinary(3), true),
Field::new("f8", fixed_size_list_data_type, false),
Field::new("f9", struct_data_type, false),
- Field::new("f10", DataType::Boolean, false),
- Field::new("f11", dict_data_type, false),
- Field::new("f12", DataType::Utf8, false),
+ Field::new("f10", run_encoded_data_type, false),
+ Field::new("f11", DataType::Boolean, false),
+ Field::new("f12", dict_data_type, false),
+ Field::new("f13", DataType::Utf8, false),
])
}
@@ -1296,14 +1359,19 @@ mod tests {
.unwrap();
let array9: ArrayRef = Arc::new(StructArray::from(array9));
- let array10 = BooleanArray::from(vec![false, false, true]);
+ let array10_input = vec![Some(1_i32), None, None];
+ let mut array10_builder = PrimitiveRunBuilder::<Int16Type,
Int32Type>::new();
+ array10_builder.extend(array10_input.into_iter());
+ let array10 = array10_builder.finish();
+
+ let array11 = BooleanArray::from(vec![false, false, true]);
- let array11_values = StringArray::from(vec!["x", "yy", "zzz"]);
- let array11_keys = Int8Array::from_iter_values([1, 1, 2]);
- let array11 =
- DictionaryArray::<Int8Type>::try_new(&array11_keys,
&array11_values).unwrap();
+ let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
+ let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
+ let array12 =
+ DictionaryArray::<Int8Type>::try_new(&array12_keys,
&array12_values).unwrap();
- let array12 = StringArray::from(vec!["a", "bb", "ccc"]);
+ let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
// create record batch
RecordBatch::try_new(
@@ -1322,6 +1390,7 @@ mod tests {
Arc::new(array10),
Arc::new(array11),
Arc::new(array12),
+ Arc::new(array13),
],
)
.unwrap()
@@ -1510,6 +1579,43 @@ mod tests {
check_union_with_builder(UnionBuilder::new_sparse());
}
+ #[test]
+ fn test_roundtrip_stream_run_array_sliced() {
+ let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c",
"c", "c"]
+ .into_iter()
+ .collect();
+ let run_array_1_sliced = run_array_1.slice(2, 5);
+
+ let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2),
Some(2)];
+ let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type,
Int32Type>::new();
+ run_array_2_builder.extend(run_array_2_inupt.into_iter());
+ let run_array_2 = run_array_2_builder.finish();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "run_array_1_sliced",
+ run_array_1_sliced.data_type().clone(),
+ false,
+ ),
+ Field::new("run_array_2", run_array_2.data_type().clone(), false),
+ ]));
+ let input_batch = RecordBatch::try_new(
+ schema,
+ vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
+ )
+ .unwrap();
+ let output_batch = roundtrip_ipc_stream(&input_batch);
+
+ // As partial comparison not yet supported for run arrays, the sliced
run array
+ // has to be unsliced before comparing with the output. the second run
array
+ // can be compared as such.
+ assert_eq!(input_batch.column(1), output_batch.column(1));
+
+ let run_array_1_unsliced =
+ unslice_run_array(run_array_1_sliced.into_data()).unwrap();
+ assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
+ }
+
#[test]
fn test_roundtrip_stream_nested_dict() {
let xs = vec!["AA", "BB", "AA", "CC", "BB"];
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 8835cb49f..f01934015 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -24,14 +24,15 @@ use std::cmp::min;
use std::collections::HashMap;
use std::io::{BufWriter, Write};
+use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
use flatbuffers::FlatBufferBuilder;
use arrow_array::builder::BufferBuilder;
use arrow_array::cast::*;
use arrow_array::*;
use arrow_buffer::bit_util;
-use arrow_buffer::{Buffer, MutableBuffer};
-use arrow_data::{layout, ArrayData, BufferSpec};
+use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
+use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
use arrow_schema::*;
use crate::compression::CompressionCodec;
@@ -218,6 +219,24 @@ impl IpcDataGenerator {
)?;
}
}
+ DataType::RunEndEncoded(_, values) => {
+ if column.data().child_data().len() != 2 {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "The run encoded array should have exactly two child
arrays. Found {}",
+ column.data().child_data().len()
+ )));
+ }
+ // The run_ends array is not expected to be dictionoary
encoded. Hence encode dictionaries
+ // only for values array.
+ let values_array =
make_array(column.data().child_data()[1].clone());
+ self.encode_dictionaries(
+ values,
+ &values_array,
+ encoded_dictionaries,
+ dictionary_tracker,
+ write_options,
+ )?;
+ }
DataType::List(field) => {
let list = as_list_array(column);
self.encode_dictionaries(
@@ -533,6 +552,94 @@ impl IpcDataGenerator {
}
}
+pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData,
ArrowError> {
+ match arr.data_type() {
+ DataType::RunEndEncoded(k, _) => match k.data_type() {
+ DataType::Int16 => Ok(into_zero_offset_run_array(
+ RunArray::<Int16Type>::from(arr),
+ )?
+ .into_data()),
+ DataType::Int32 => Ok(into_zero_offset_run_array(
+ RunArray::<Int32Type>::from(arr),
+ )?
+ .into_data()),
+ DataType::Int64 => Ok(into_zero_offset_run_array(
+ RunArray::<Int64Type>::from(arr),
+ )?
+ .into_data()),
+ d => unreachable!("Unexpected data type {d}"),
+ },
+ d => Err(ArrowError::InvalidArgumentError(format!(
+ "The given array is not a run array. Data type of given array: {d}"
+ ))),
+ }
+}
+
+// Returns a `RunArray` with zero offset and length matching the last value
+// in run_ends array.
+fn into_zero_offset_run_array<R: RunEndIndexType>(
+ run_array: RunArray<R>,
+) -> Result<RunArray<R>, ArrowError> {
+ if run_array.offset() == 0
+ && run_array.len() == RunArray::<R>::logical_len(run_array.run_ends())
+ {
+ return Ok(run_array);
+ }
+ // The physical index of original run_ends array from which the
`ArrayData`is sliced.
+ let start_physical_index = run_array
+ .get_zero_offset_physical_index(run_array.offset())
+ .unwrap();
+
+ // The logical length of original run_ends array until which the
`ArrayData` is sliced.
+ let end_logical_index = run_array.offset() + run_array.len() - 1;
+ // The physical index of original run_ends array until which the
`ArrayData`is sliced.
+ let end_physical_index = run_array
+ .get_zero_offset_physical_index(end_logical_index)
+ .unwrap();
+
+ let physical_length = end_physical_index - start_physical_index + 1;
+
+ // build new run_ends array by subtrating offset from run ends.
+ let mut builder = BufferBuilder::<R::Native>::new(physical_length);
+ for ix in start_physical_index..end_physical_index {
+ let run_end_value = unsafe {
+ // Safety:
+ // start_physical_index and end_physical_index are within
+ // run_ends array bounds.
+ run_array.run_ends().value_unchecked(ix).as_usize()
+ };
+ let run_end_value = run_end_value - run_array.offset();
+ builder.append(R::Native::from_usize(run_end_value).unwrap());
+ }
+ builder.append(R::Native::from_usize(run_array.len()).unwrap());
+ let new_run_ends = unsafe {
+ // Safety:
+ // The function builds a valid run_ends array and hence need not be
validated.
+ ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+ .len(physical_length)
+ .null_count(0)
+ .add_buffer(builder.finish())
+ .build_unchecked()
+ };
+
+ // build new values by slicing physical indices.
+ let new_values = run_array
+ .values()
+ .slice(start_physical_index, physical_length)
+ .into_data();
+
+ let builder = ArrayDataBuilder::new(run_array.data_type().clone())
+ .len(run_array.len())
+ .add_child_data(new_run_ends)
+ .add_child_data(new_values);
+ let array_data = unsafe {
+ // Safety:
+ // This function builds a valid run array and hence can skip
validation.
+ builder.build_unchecked()
+ };
+ Ok(array_data.into())
+}
+
/// Keeps track of dictionaries that have been written, to avoid emitting the
same dictionary
/// multiple times. Can optionally error if an update to an existing
dictionary is attempted, which
/// isn't allowed in the `FileWriter`.
@@ -968,11 +1075,15 @@ fn write_continuation<W: Write>(
/// In V4, null types have no validity bitmap
/// In V5 and later, null and union types have no validity bitmap
+/// Run end encoded type has no validity bitmap.
fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions)
-> bool {
if write_options.metadata_version < crate::MetadataVersion::V5 {
!matches!(data_type, DataType::Null)
} else {
- !matches!(data_type, DataType::Null | DataType::Union(_, _, _))
+ !matches!(
+ data_type,
+ DataType::Null | DataType::Union(_, _, _) |
DataType::RunEndEncoded(_, _)
+ )
}
}
@@ -1242,24 +1353,45 @@ fn write_array_data(
}
}
- if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) {
- // recursively write out nested structures
- for data_ref in array_data.child_data() {
- // write the nested data (e.g list data)
- offset = write_array_data(
- data_ref,
- buffers,
- arrow_data,
- nodes,
- offset,
- data_ref.len(),
- data_ref.null_count(),
- compression_codec,
- write_options,
- )?;
+ match array_data.data_type() {
+ DataType::Dictionary(_, _) => {}
+ DataType::RunEndEncoded(_, _) => {
+ // unslice the run encoded array.
+ let arr = unslice_run_array(array_data.clone())?;
+ // recursively write out nested structures
+ for data_ref in arr.child_data() {
+ // write the nested data (e.g list data)
+ offset = write_array_data(
+ data_ref,
+ buffers,
+ arrow_data,
+ nodes,
+ offset,
+ data_ref.len(),
+ data_ref.null_count(),
+ compression_codec,
+ write_options,
+ )?;
+ }
+ }
+ _ => {
+ // recursively write out nested structures
+ for data_ref in array_data.child_data() {
+ // write the nested data (e.g list data)
+ offset = write_array_data(
+ data_ref,
+ buffers,
+ arrow_data,
+ nodes,
+ offset,
+ data_ref.len(),
+ data_ref.null_count(),
+ compression_codec,
+ write_options,
+ )?;
+ }
}
}
-
Ok(offset)
}
@@ -1322,6 +1454,7 @@ mod tests {
use crate::MetadataVersion;
use crate::reader::*;
+ use arrow_array::builder::PrimitiveRunBuilder;
use arrow_array::builder::UnionBuilder;
use arrow_array::types::*;
use arrow_schema::DataType;
@@ -1992,4 +2125,62 @@ mod tests {
let batch2 = reader.next().unwrap().unwrap();
assert_eq!(batch, batch2);
}
+
+ #[test]
+ fn test_run_array_unslice() {
+ let total_len = 80;
+ let vals: Vec<Option<i32>> =
+ vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
+ let repeats: Vec<usize> = vec![3, 4, 1, 2];
+ let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
+ for ix in 0_usize..32 {
+ let repeat: usize = repeats[ix % repeats.len()];
+ let val: Option<i32> = vals[ix % vals.len()];
+ input_array.resize(input_array.len() + repeat, val);
+ }
+
+ // Encode the input_array to run array
+ let mut builder =
+ PrimitiveRunBuilder::<Int16Type,
Int32Type>::with_capacity(input_array.len());
+ builder.extend(input_array.iter().copied());
+ let run_array = builder.finish();
+
+ // test for all slice lengths.
+ for slice_len in 1..=total_len {
+ // test for offset = 0, slice length = slice_len
+ let sliced_run_array: RunArray<Int16Type> =
+ run_array.slice(0, slice_len).into_data().into();
+
+ // Create unsliced run array.
+ let unsliced_run_array =
+ into_zero_offset_run_array(sliced_run_array).unwrap();
+ let typed = unsliced_run_array
+ .downcast::<PrimitiveArray<Int32Type>>()
+ .unwrap();
+ let expected: Vec<Option<i32>> =
+ input_array.iter().take(slice_len).copied().collect();
+ let actual: Vec<Option<i32>> = typed.into_iter().collect();
+ assert_eq!(expected, actual);
+
+ // test for offset = total_len - slice_len, length = slice_len
+ let sliced_run_array: RunArray<Int16Type> = run_array
+ .slice(total_len - slice_len, slice_len)
+ .into_data()
+ .into();
+
+ // Create unsliced run array.
+ let unsliced_run_array =
+ into_zero_offset_run_array(sliced_run_array).unwrap();
+ let typed = unsliced_run_array
+ .downcast::<PrimitiveArray<Int32Type>>()
+ .unwrap();
+ let expected: Vec<Option<i32>> = input_array
+ .iter()
+ .skip(total_len - slice_len)
+ .copied()
+ .collect();
+ let actual: Vec<Option<i32>> = typed.into_iter().collect();
+ assert_eq!(expected, actual);
+ }
+ }
}
diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs
index f8668b56e..f8383bbe3 100644
--- a/arrow-select/src/take.rs
+++ b/arrow-select/src/take.rs
@@ -832,7 +832,6 @@ macro_rules! primitive_run_take {
/// for e.g. an input `RunArray{ run_ends = [2,4,6,8], values=[1,2,1,2] }` and
`indices=[2,7]`
/// would be converted to `physical_indices=[1,3]` which will be used to build
/// output `RunArray{ run_ends=[2], values=[2] }`
-
fn take_run<T, I>(
run_array: &RunArray<T>,
logical_indices: &PrimitiveArray<I>,
diff --git a/format/Schema.fbs b/format/Schema.fbs
index 7ee827b5d..6337f72ec 100644
--- a/format/Schema.fbs
+++ b/format/Schema.fbs
@@ -19,8 +19,9 @@
/// Format Version History.
/// Version 1.0 - Forward and backwards compatibility guaranteed.
-/// Version 1.1 - Add Decimal256 (No format release).
-/// Version 1.2 (Pending)- Add Interval MONTH_DAY_NANO
+/// Version 1.1 - Add Decimal256.
+/// Version 1.2 - Add Interval MONTH_DAY_NANO
+/// Version 1.3 - Add Run-End Encoded.
namespace org.apache.arrow.flatbuf;
@@ -178,6 +179,14 @@ table FixedSizeBinary {
table Bool {
}
+/// Contains two child arrays, run_ends and values.
+/// The run_ends child array must be a 16/32/64-bit integer array
+/// which encodes the indices at which the run with the value in
+/// each corresponding index in the values child array ends.
+/// Like list/struct types, the value array can be of any type.
+table RunEndEncoded {
+}
+
/// Exact decimal value represented as an integer value in two's
/// complement. Currently only 128-bit (16-byte) and 256-bit (32-byte) integers
/// are used. The representation uses the endianness indicated
@@ -417,6 +426,7 @@ union Type {
LargeBinary,
LargeUtf8,
LargeList,
+ RunEndEncoded,
}
/// ----------------------------------------------------------------------