scovich commented on code in PR #7935:
URL: https://github.com/apache/arrow-rs/pull/7935#discussion_r2216465127
##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
}
}
}
+
+ // returns the beginning offset of buffer for the parent if it is object
builder, else 0.
+ // for object builder will reuse the buffer from the parent, this is
needed for `finish`
+ // which needs the relative offset from the current variant.
+ fn object_start_offset(&self) -> usize {
+ match self {
+ ParentState::Object {
+ object_start_offset,
+ ..
+ } => *object_start_offset,
+ _ => 0,
+ }
+ }
+
+ /// Return mutable references to the buffer and metadata builder that this
+ /// parent state is using.
+ fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut
MetadataBuilder) {
+ match self {
+ ParentState::Variant {
+ buffer,
+ metadata_builder,
+ } => (buffer, metadata_builder),
+ ParentState::List {
+ buffer,
+ metadata_builder,
+ ..
+ } => (buffer, metadata_builder),
+ ParentState::Object {
+ buffer,
+ metadata_builder,
+ ..
+ } => (buffer, metadata_builder),
Review Comment:
I _think_ rust allows this:
```suggestion
ParentState::Variant {
buffer,
metadata_builder,
} |
ParentState::List {
buffer,
metadata_builder,
..
} |
ParentState::Object {
buffer,
metadata_builder,
..
} => (buffer, metadata_builder),
```
Not clear whether that's better or worse than the current code.
I'm also not sure how it would `fmt`.
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
+
+ let starting_offset = self.object_start_offset;
+
+ // Shift existing data to make room for the header
+ let buffer = parent_buffer.inner_mut();
+ buffer.splice(starting_offset..starting_offset, vec![0u8;
header_size]);
+
+ // Write header at the original start position
+ let mut header_pos = starting_offset;
+
+ // Write header byte
let header = object_header(is_large, id_size, offset_size);
- parent_buffer.append_header(header, is_large, num_fields);
+ buffer[header_pos] = header;
+ header_pos += 1;
+
+ // Write number of fields
+ if is_large {
+ buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as
u32).to_le_bytes());
+ header_pos += 4;
+ } else {
+ buffer[header_pos] = num_fields as u8;
+ header_pos += 1;
+ }
- // Write field IDs (sorted order)
- let ids = self.fields.keys().map(|id| *id as usize);
- parent_buffer.append_offset_array(ids, None, id_size);
+ // Write field IDs
+ for (&field_id, _) in &self.fields {
+ let id_bytes = (field_id as usize).to_le_bytes();
+ buffer[header_pos..header_pos + id_size as usize]
+ .copy_from_slice(&id_bytes[..id_size as usize]);
+ header_pos += id_size as usize;
+ }
- // Write the field offset array, followed by the value bytes
- let offsets = std::mem::take(&mut self.fields).into_values();
- parent_buffer.append_offset_array(offsets, Some(data_size),
offset_size);
- parent_buffer.append_slice(self.buffer.inner());
- self.parent_state.finish(starting_offset);
+ // Write field offsets (adjusted for header)
+ for (_, &relative_offset) in &self.fields {
Review Comment:
```suggestion
for relative_offset in self.fields.values() {
```
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
+
+ let starting_offset = self.object_start_offset;
+
+ // Shift existing data to make room for the header
+ let buffer = parent_buffer.inner_mut();
+ buffer.splice(starting_offset..starting_offset, vec![0u8;
header_size]);
+
+ // Write header at the original start position
+ let mut header_pos = starting_offset;
+
+ // Write header byte
let header = object_header(is_large, id_size, offset_size);
- parent_buffer.append_header(header, is_large, num_fields);
+ buffer[header_pos] = header;
+ header_pos += 1;
+
+ // Write number of fields
+ if is_large {
+ buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as
u32).to_le_bytes());
+ header_pos += 4;
+ } else {
+ buffer[header_pos] = num_fields as u8;
+ header_pos += 1;
+ }
- // Write field IDs (sorted order)
- let ids = self.fields.keys().map(|id| *id as usize);
- parent_buffer.append_offset_array(ids, None, id_size);
+ // Write field IDs
+ for (&field_id, _) in &self.fields {
+ let id_bytes = (field_id as usize).to_le_bytes();
+ buffer[header_pos..header_pos + id_size as usize]
+ .copy_from_slice(&id_bytes[..id_size as usize]);
+ header_pos += id_size as usize;
+ }
- // Write the field offset array, followed by the value bytes
- let offsets = std::mem::take(&mut self.fields).into_values();
- parent_buffer.append_offset_array(offsets, Some(data_size),
offset_size);
- parent_buffer.append_slice(self.buffer.inner());
- self.parent_state.finish(starting_offset);
+ // Write field offsets (adjusted for header)
+ for (_, &relative_offset) in &self.fields {
+ let offset_bytes = relative_offset.to_le_bytes();
+ buffer[header_pos..header_pos + offset_size as usize]
+ .copy_from_slice(&offset_bytes[..offset_size as usize]);
+ header_pos += offset_size as usize;
+ }
+
+ // Write data_size
+ let data_size_bytes = data_size.to_le_bytes();
+ buffer[header_pos..header_pos + offset_size as usize]
+ .copy_from_slice(&data_size_bytes[..offset_size as usize]);
+
+ let start_offset_shift = self.parent_state.object_start_offset();
+ self.parent_state
+ .finish(starting_offset - start_offset_shift);
Review Comment:
Why make the caller of `ParentState::finish` extract the start offset? Seems
like parent state can do that more easily and reliably on its own?
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
+
+ let starting_offset = self.object_start_offset;
+
+ // Shift existing data to make room for the header
+ let buffer = parent_buffer.inner_mut();
+ buffer.splice(starting_offset..starting_offset, vec![0u8;
header_size]);
Review Comment:
aside: I'd be very impressed if the compiler is smart enough to optimize
away the allocation in `vec![0u8; header_size].into_iter()`, which would
potentially cause the above to run slower than the current code.
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
+
+ let starting_offset = self.object_start_offset;
+
+ // Shift existing data to make room for the header
+ let buffer = parent_buffer.inner_mut();
+ buffer.splice(starting_offset..starting_offset, vec![0u8;
header_size]);
Review Comment:
Alternatively -- that small allocation apparently isn't hurting performance.
What if we create the temp vec with initial capacity of `header_size` and then
populate it with the actual header info, field ids, and offsets before splicing
it in. That way, an incorrect `header_size` calculation would not impact
correctness:
```rust
let mut bytes_to_splice = Vec::with_capacity(header_size);
bytes_to_splice.push(header_byte);
if is_large {
bytes_to_splice.extend((num_fields as u32).to_le_bytes());
} else {
bytes_to_splice.push(num_fields as u8);
}
for &field_id in self.fields.keys() {
bytes_to_splice.extend(field_id.to_le_bytes().into_iter().take(id_size));
}
for &offset in self.fields.values() {
bytes_to_splice.extend((offset as
u32).to_le_bytes().into_iter().take(offset_size));
}
bytes_to_splice.extend((data_size as
u32).to_le_bytes().into_iter().take(offset_size));
buffer.splice(starting_offset..starting_offset, bytes_to_splice);
```
##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
}
}
}
+
+ // returns the beginning offset of buffer for the parent if it is object
builder, else 0.
+ // for object builder will reuse the buffer from the parent, this is
needed for `finish`
+ // which needs the relative offset from the current variant.
+ fn object_start_offset(&self) -> usize {
Review Comment:
Doesn't variant array also have a starting offset?
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
Review Comment:
One thing I don't love about this approach is the independent calculation of
the splice size vs. the bytes that actually get inserted. If they ever
disagreed... badness as the insertions underflow or overflow the splice.
Ideally, we could produce an iterator that emits the desired bytes, and the
splice itself can guarantee correct behavior. But for that to work the iterator
would need to provide an accurate lower bound, so which rules out
`std::iter::from_fn`. Even if we did craft a custom iterator, computing its
lower bound would basically be this same calculation all over again. We could
also chain together a bunch of iterators, which preserves the lower bound, but
somehow I doubt that would be efficient.
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
+
+ let starting_offset = self.object_start_offset;
+
+ // Shift existing data to make room for the header
+ let buffer = parent_buffer.inner_mut();
+ buffer.splice(starting_offset..starting_offset, vec![0u8;
header_size]);
+
+ // Write header at the original start position
+ let mut header_pos = starting_offset;
+
+ // Write header byte
let header = object_header(is_large, id_size, offset_size);
- parent_buffer.append_header(header, is_large, num_fields);
+ buffer[header_pos] = header;
+ header_pos += 1;
+
+ // Write number of fields
+ if is_large {
+ buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as
u32).to_le_bytes());
+ header_pos += 4;
+ } else {
+ buffer[header_pos] = num_fields as u8;
+ header_pos += 1;
+ }
- // Write field IDs (sorted order)
- let ids = self.fields.keys().map(|id| *id as usize);
- parent_buffer.append_offset_array(ids, None, id_size);
+ // Write field IDs
+ for (&field_id, _) in &self.fields {
Review Comment:
```suggestion
for field_id in self.fields.keys() {
```
##########
parquet-variant/src/builder.rs:
##########
@@ -1317,7 +1414,15 @@ impl<'a> ObjectBuilder<'a> {
/// This is to ensure that the object is always finalized before its parent
builder
/// is finalized.
impl Drop for ObjectBuilder<'_> {
- fn drop(&mut self) {}
+ fn drop(&mut self) {
+ // truncate the buffer if the `finish` method has not been called.
+ if !self.has_been_finished {
+ self.parent_state
+ .buffer()
+ .inner_mut()
+ .truncate(self.object_start_offset);
+ }
Review Comment:
While we're at it, we should truncate the `MetadataBuilder` back to the size
it had when we started, to clean up any new field ids the failed builder might
have created.
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
Review Comment:
aside: Rather than pay a traversal over the list of fields, I would just use
the metadata builder's current length as a _very_ cheap to compute upper bound.
It will almost always be a tight upper bound as well -- it would take a pretty
carefully crafted object to use only the early/small field ids of a large
dictionary, such that the dictionary length needs more encoding bytes than the
largest field ids of this object.
##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
}
}
}
+
+ // returns the beginning offset of buffer for the parent if it is object
builder, else 0.
+ // for object builder will reuse the buffer from the parent, this is
needed for `finish`
+ // which needs the relative offset from the current variant.
+ fn object_start_offset(&self) -> usize {
+ match self {
+ ParentState::Object {
+ object_start_offset,
+ ..
+ } => *object_start_offset,
+ _ => 0,
+ }
+ }
+
+ /// Return mutable references to the buffer and metadata builder that this
+ /// parent state is using.
+ fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut
MetadataBuilder) {
+ match self {
+ ParentState::Variant {
+ buffer,
+ metadata_builder,
+ } => (buffer, metadata_builder),
+ ParentState::List {
+ buffer,
+ metadata_builder,
+ ..
+ } => (buffer, metadata_builder),
+ ParentState::Object {
+ buffer,
+ metadata_builder,
+ ..
+ } => (buffer, metadata_builder),
+ }
+ }
+
+ // return the offset of the underlying buffer at the time of calling this
method.
+ fn buffer_current_offset(&self) -> usize {
+ match self {
+ ParentState::Variant { buffer, .. } => buffer.offset(),
+ ParentState::Object { buffer, .. } => buffer.offset(),
+ ParentState::List { buffer, .. } => buffer.offset(),
+ }
Review Comment:
As above, can reduce redundancy by
```suggestion
match self {
ParentState::Variant { buffer, .. }
| ParentState::Object { buffer, .. }
| ParentState::List { buffer, .. } => buffer.offset(),
}
```
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
+
+ let starting_offset = self.object_start_offset;
+
+ // Shift existing data to make room for the header
+ let buffer = parent_buffer.inner_mut();
+ buffer.splice(starting_offset..starting_offset, vec![0u8;
header_size]);
Review Comment:
Can avoid even this allocation:
```suggestion
buffer.splice(starting_offset..starting_offset,
std::iter::repeat_n(0u8, header_size));
```
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
Review Comment:
... but again, I worry that wrapping up all those for-loops inside an
iterator for direct splicing will turn out to be a lot slower than the
splice-then-overwrite approach the code currently takes.
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
Review Comment:
Something like might _almost_ work?
```rust
let field_ids = self.fields.keys().flat_map(|field_id| {
(field_id as usize).to_le_bytes().into_iter().take(id_size)
});
let offsets = self.fields.values().flat_map(|offset| {
offset.to_le_bytes().into_iter().take(offset_size)
});
let num_fields = num_fields.to_le_bytes().take(if is_large 4 else 1);
let header_and_num_fields = std::iter::once(header_byte).chain(num_fields);
let field_ids_and_offsets = field_ids.chain(offsets);
let bytes_to_splice = header_and_num_fields.chain(field_ids_and_offsets);
buffer.splice(starting_offset..starting_offset, bytes_to_splice);
```
... but unfortunately `Iterator::flat_map` does _not_ (honestly, cannot)
compute an accurate lower bound size hint. So the `splice` call would end up
having to allocate an internal temp buffer.
##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
}
}
}
+
+ // returns the beginning offset of buffer for the parent if it is object
builder, else 0.
+ // for object builder will reuse the buffer from the parent, this is
needed for `finish`
+ // which needs the relative offset from the current variant.
+ fn object_start_offset(&self) -> usize {
+ match self {
+ ParentState::Object {
+ object_start_offset,
+ ..
+ } => *object_start_offset,
+ _ => 0,
+ }
+ }
+
+ /// Return mutable references to the buffer and metadata builder that this
+ /// parent state is using.
+ fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut
MetadataBuilder) {
+ match self {
+ ParentState::Variant {
+ buffer,
+ metadata_builder,
+ } => (buffer, metadata_builder),
+ ParentState::List {
+ buffer,
+ metadata_builder,
+ ..
+ } => (buffer, metadata_builder),
+ ParentState::Object {
+ buffer,
+ metadata_builder,
+ ..
+ } => (buffer, metadata_builder),
+ }
+ }
+
+ // return the offset of the underlying buffer at the time of calling this
method.
+ fn buffer_current_offset(&self) -> usize {
+ match self {
+ ParentState::Variant { buffer, .. } => buffer.offset(),
+ ParentState::Object { buffer, .. } => buffer.offset(),
+ ParentState::List { buffer, .. } => buffer.offset(),
+ }
Review Comment:
Actually... we can just invoke `ParentState::buffer`?
```suggestion
self.buffer().offset()
```
##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
)));
}
- let data_size = self.buffer.offset();
- let num_fields = self.fields.len();
- let is_large = num_fields > u8::MAX as usize;
+ let metadata_builder = self.parent_state.metadata_builder();
self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
- let key_a = &metadata_builder.field_name(field_a_id as usize);
- let key_b = &metadata_builder.field_name(field_b_id as usize);
- key_a.cmp(key_b)
+ let field_a_name = metadata_builder.field_name(field_a_id as
usize);
+ let field_b_name = metadata_builder.field_name(field_b_id as
usize);
+ field_a_name.cmp(field_b_name)
});
-
let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
let id_size = int_size(max_id as usize);
- let offset_size = int_size(data_size);
- // Get parent's buffer
let parent_buffer = self.parent_state.buffer();
- let starting_offset = parent_buffer.offset();
+ let current_offset = parent_buffer.offset();
+ // current object starts from `object_start_offset`
+ let data_size = current_offset - self.object_start_offset;
+ let offset_size = int_size(data_size);
- // Write header
+ let num_fields = self.fields.len();
+ let is_large = num_fields > u8::MAX as usize;
+
+ let header_size = 1 + // header byte
+ (if is_large { 4 } else { 1 }) + // num_fields
+ (num_fields * id_size as usize) + // field IDs
+ ((num_fields + 1) * offset_size as usize); // field offsets +
data_size
Review Comment:
I just realized -- a custom iterator that computes its size hint is still
safer than the current code, because an incorrect size hint won't corrupt any
data. It will just require extra allocations and/or byte shifting. There's
still the question of performance tho. If it's drastically slower the extra
safety probably wouldn't be worth it.
Another possibility is to take a hybrid approach -- rely on
`Iterator::chain` for most of the heavy lifting, but define a custom iterator
for emitting offset/field_id arrays:
```rust
let num_fields = num_fields.to_le_bytes().take(if is_large 4 else 1);
let header_and_num_fields = std::iter::once(header_byte).chain(num_fields);
let field_ids = PackedU32Iterator::new(id_size, self.fields.keys().copied());
let offsets = PackedU32Iterator::new(offset_size,
self.fields.values().map(|offset| *offset as u32));
let field_ids_and_offsets = field_ids.chain(offsets);
let bytes_to_splice = header_and_num_fields.chain(field_ids_and_offsets);
buffer.splice(starting_offset..starting_offset, bytes_to_splice);
```
<details>
<summary>PackedU32Iterator</summary>
```rust
struct PackedU32Iterator<T: impl Iterator<Item = [u8; 4]>> {
packed_bytes: usize,
iterator: T,
current_item: [u8; 4],
current_byte: usize, // 0..3
}
impl<T: impl Iterator<Item = [u8; 4]>> PackedU32Iterator<T> {
fn new(packed_bytes: usize, iterator: T) -> Self {
// eliminate corner cases in `next` by initializing with a fake
already-consumed "first" item
Self {
packed_bytes,
iterator,
current_item: [0; 4],
current_byte: packed_bytes,
}
}
}
impl<T: impl Iterator<Item = [u8; 4]>> Iterator for PackedU32Iterator<T> {
fn size_hint(&self) -> (usize, Option<usize>) {
let lower = (packed_bytes - current_byte) + packed_bytes *
iterator.size_hint().0;
(lower, None)
}
fn next(&mut self) -> Option<u8> {
if self.current_byte >= self.packed_bytes {
let Some(next_item) = self.iterator.next() else {
return None;
};
self.current_item = next_item;
self.current_byte = 0;
}
let rval = self.current_item[self.current_byte];
self.current_byte += 1;
Some(rval)
}
}
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]