alamb commented on code in PR #1716:
URL: https://github.com/apache/arrow-rs/pull/1716#discussion_r878681742
##########
parquet/src/arrow/mod.rs:
##########
@@ -133,11 +140,71 @@ pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
+use crate::schema::types::SchemaDescriptor;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns,
- parquet_to_arrow_schema_by_root_columns,
};
/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
+
+/// A [`ProjectionMask`] identifies a set of columns within a potentially
nested schema to project
+#[derive(Debug, Clone)]
+pub struct ProjectionMask {
+ /// A mask of
+ mask: Option<Vec<bool>>,
+}
+
+impl ProjectionMask {
+ /// Create a [`ProjectionMask`] which selects all columns
+ pub fn all() -> Self {
+ Self { mask: None }
+ }
+
+ /// Create a [`ProjectionMask`] which selects only the specified leaf
columns
Review Comment:
Can you please explain (or provide a link to something that explains) leaves
and roots and what "order" they are in. I think it refers to the parquet schema
(or maybe the arrow schema and types within Structs / LIsts / others nested
types?)
##########
parquet/src/arrow/schema.rs:
##########
@@ -51,74 +52,18 @@ pub fn parquet_to_arrow_schema(
) -> Result<Schema> {
parquet_to_arrow_schema_by_columns(
parquet_schema,
- 0..parquet_schema.columns().len(),
+ ProjectionMask::all(),
Review Comment:
❤️
##########
parquet/src/arrow/mod.rs:
##########
@@ -133,11 +140,71 @@ pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
+use crate::schema::types::SchemaDescriptor;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns,
- parquet_to_arrow_schema_by_root_columns,
};
/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
+
+/// A [`ProjectionMask`] identifies a set of columns within a potentially
nested schema to project
+#[derive(Debug, Clone)]
+pub struct ProjectionMask {
+ /// A mask of
+ mask: Option<Vec<bool>>,
+}
+
+impl ProjectionMask {
+ /// Create a [`ColumnMask`] which selects all columns
+ pub fn all() -> Self {
+ Self { mask: None }
+ }
+
+ /// Create a [`ColumnMask`] which selects only the specified leaf columns
+ ///
+ /// Note: repeated or out of order indices will not impact the final mask
+ ///
+ /// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
+ pub fn leaves(
+ schema: &SchemaDescriptor,
Review Comment:
As long as a useful error is produced, I agree this is fine behavior
##########
parquet/src/arrow/schema.rs:
##########
@@ -1188,9 +1112,9 @@ mod tests {
// required int64 leaf5;
let parquet_schema =
SchemaDescriptor::new(Arc::new(parquet_group_type));
+ let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
Review Comment:
this is much nicer to read / reason about
##########
parquet/src/arrow/mod.rs:
##########
@@ -133,11 +140,71 @@ pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
+use crate::schema::types::SchemaDescriptor;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns,
- parquet_to_arrow_schema_by_root_columns,
};
/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
+
+/// A [`ProjectionMask`] identifies a set of columns within a potentially
nested schema to project
+#[derive(Debug, Clone)]
+pub struct ProjectionMask {
+ /// A mask of
Review Comment:
The comment seems to be truncated.
Also, since we have `Bitmap` and all the associated handling in Arrow, I
wonder if it is worth using that (though a `Vec<bool>` is nice and simple
##########
parquet/src/arrow/schema.rs:
##########
@@ -51,74 +52,18 @@ pub fn parquet_to_arrow_schema(
) -> Result<Schema> {
parquet_to_arrow_schema_by_columns(
parquet_schema,
- 0..parquet_schema.columns().len(),
+ ProjectionMask::all(),
key_value_metadata,
)
}
-/// Convert parquet schema to arrow schema including optional metadata,
-/// only preserving some root columns.
-/// This is useful if we have columns `a.b`, `a.c.e` and `a.d`,
-/// and want `a` with all its child fields
-pub fn parquet_to_arrow_schema_by_root_columns<T>(
- parquet_schema: &SchemaDescriptor,
- column_indices: T,
- key_value_metadata: Option<&Vec<KeyValue>>,
-) -> Result<Schema>
-where
- T: IntoIterator<Item = usize>,
-{
- // Reconstruct the index ranges of the parent columns
- // An Arrow struct gets represented by 1+ columns based on how many child
fields the
- // struct has. This means that getting fields 1 and 2 might return the
struct twice,
- // if field 1 is the struct having say 3 fields, and field 2 is a
primitive.
- //
- // The below gets the parent columns, and counts the number of child
fields in each parent,
- // such that we would end up with:
- // - field 1 - columns: [0, 1, 2]
- // - field 2 - columns: [3]
- let mut parent_columns = vec![];
- let mut curr_name = "";
- let mut prev_name = "";
- let mut indices = vec![];
- (0..(parquet_schema.num_columns())).for_each(|i| {
- let p_type = parquet_schema.get_column_root(i);
- curr_name = p_type.get_basic_info().name();
- if prev_name.is_empty() {
- // first index
- indices.push(i);
- prev_name = curr_name;
- } else if curr_name != prev_name {
- prev_name = curr_name;
- parent_columns.push((curr_name.to_string(), indices.clone()));
- indices = vec![i];
- } else {
- indices.push(i);
- }
- });
- // push the last column if indices has values
- if !indices.is_empty() {
- parent_columns.push((curr_name.to_string(), indices));
- }
-
- // gather the required leaf columns
- let leaf_columns = column_indices
- .into_iter()
- .flat_map(|i| parent_columns[i].1.clone());
-
- parquet_to_arrow_schema_by_columns(parquet_schema, leaf_columns,
key_value_metadata)
-}
-
/// Convert parquet schema to arrow schema including optional metadata,
/// only preserving some leaf columns.
-pub fn parquet_to_arrow_schema_by_columns<T>(
+pub fn parquet_to_arrow_schema_by_columns(
parquet_schema: &SchemaDescriptor,
- column_indices: T,
+ mask: ProjectionMask,
Review Comment:
I wonder if this needs an owned mask or if it could be taken by reference
```suggestion
mask: &ProjectionMask,
```
##########
parquet/src/arrow/mod.rs:
##########
@@ -133,11 +140,71 @@ pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
+use crate::schema::types::SchemaDescriptor;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns,
- parquet_to_arrow_schema_by_root_columns,
};
/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
+
+/// A [`ProjectionMask`] identifies a set of columns within a potentially
nested schema to project
+#[derive(Debug, Clone)]
+pub struct ProjectionMask {
+ /// A mask of
Review Comment:
Describing in the docstring that a mask of `None` means `All` is probably
also a good idea as well as which schema a `ProjectionMask`s indexes refer to
(I think Parquet)
##########
parquet/src/arrow/mod.rs:
##########
@@ -133,11 +140,71 @@ pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
+use crate::schema::types::SchemaDescriptor;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns,
- parquet_to_arrow_schema_by_root_columns,
};
/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
+
+/// A [`ProjectionMask`] identifies a set of columns within a potentially
nested schema to project
+#[derive(Debug, Clone)]
+pub struct ProjectionMask {
+ /// A mask of
+ mask: Option<Vec<bool>>,
+}
+
+impl ProjectionMask {
+ /// Create a [`ProjectionMask`] which selects all columns
+ pub fn all() -> Self {
+ Self { mask: None }
+ }
+
+ /// Create a [`ProjectionMask`] which selects only the specified leaf
columns
+ ///
+ /// Note: repeated or out of order indices will not impact the final mask
Review Comment:
Nice -- so the idea is that you enforce masks having in-order indicies by
wrapping them in a `ProjectionMask` which enforces this invariant during
construction 👍
##########
parquet/src/arrow/schema.rs:
##########
@@ -155,24 +100,24 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) ->
Result<Schema> {
Ok(message) => message
.header_as_schema()
.map(arrow::ipc::convert::fb_to_schema)
- .ok_or(ArrowError("the message is not Arrow
Schema".to_string())),
+ .ok_or(arrow_err!("the message is not Arrow Schema")),
Review Comment:
Using `arrow_err` is not obviously better to me: it is the same verbosity
but now requires looking / knowing what the `arrow_err!` macro does
;0 But it is not worse either
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]