scovich commented on code in PR #7935: URL: https://github.com/apache/arrow-rs/pull/7935#discussion_r2216465127
########## parquet-variant/src/builder.rs: ########## @@ -598,6 +599,49 @@ impl ParentState<'_> { } } } + + // returns the beginning offset of buffer for the parent if it is object builder, else 0. + // for object builder will reuse the buffer from the parent, this is needed for `finish` + // which needs the relative offset from the current variant. + fn object_start_offset(&self) -> usize { + match self { + ParentState::Object { + object_start_offset, + .. + } => *object_start_offset, + _ => 0, + } + } + + /// Return mutable references to the buffer and metadata builder that this + /// parent state is using. + fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut MetadataBuilder) { + match self { + ParentState::Variant { + buffer, + metadata_builder, + } => (buffer, metadata_builder), + ParentState::List { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + ParentState::Object { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), Review Comment: I _think_ rust allows this: ```suggestion ParentState::Variant { buffer, metadata_builder, } | ParentState::List { buffer, metadata_builder, .. } | ParentState::Object { buffer, metadata_builder, .. } => (buffer, metadata_builder), ``` Not clear whether that's better or worse than the current code. I'm also not sure how it would `fmt`. ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size + + let starting_offset = self.object_start_offset; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); + + // Write header at the original start position + let mut header_pos = starting_offset; + + // Write header byte let header = object_header(is_large, id_size, offset_size); - parent_buffer.append_header(header, is_large, num_fields); + buffer[header_pos] = header; + header_pos += 1; + + // Write number of fields + if is_large { + buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); + header_pos += 4; + } else { + buffer[header_pos] = num_fields as u8; + header_pos += 1; + } - // Write field IDs (sorted order) - let ids = self.fields.keys().map(|id| *id as usize); - parent_buffer.append_offset_array(ids, None, id_size); + // Write field IDs + for (&field_id, _) in &self.fields { + let id_bytes = (field_id as usize).to_le_bytes(); + buffer[header_pos..header_pos + id_size as usize] + .copy_from_slice(&id_bytes[..id_size as usize]); + header_pos += id_size as usize; + } - // Write the field offset array, followed by the value bytes - let offsets = std::mem::take(&mut self.fields).into_values(); - parent_buffer.append_offset_array(offsets, Some(data_size), offset_size); - parent_buffer.append_slice(self.buffer.inner()); - self.parent_state.finish(starting_offset); + // Write field offsets (adjusted for header) + for (_, &relative_offset) in &self.fields { Review Comment: ```suggestion for relative_offset in self.fields.values() { ``` ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size + + let starting_offset = self.object_start_offset; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); + + // Write header at the original start position + let mut header_pos = starting_offset; + + // Write header byte let header = object_header(is_large, id_size, offset_size); - parent_buffer.append_header(header, is_large, num_fields); + buffer[header_pos] = header; + header_pos += 1; + + // Write number of fields + if is_large { + buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); + header_pos += 4; + } else { + buffer[header_pos] = num_fields as u8; + header_pos += 1; + } - // Write field IDs (sorted order) - let ids = self.fields.keys().map(|id| *id as usize); - parent_buffer.append_offset_array(ids, None, id_size); + // Write field IDs + for (&field_id, _) in &self.fields { + let id_bytes = (field_id as usize).to_le_bytes(); + buffer[header_pos..header_pos + id_size as usize] + .copy_from_slice(&id_bytes[..id_size as usize]); + header_pos += id_size as usize; + } - // Write the field offset array, followed by the value bytes - let offsets = std::mem::take(&mut self.fields).into_values(); - parent_buffer.append_offset_array(offsets, Some(data_size), offset_size); - parent_buffer.append_slice(self.buffer.inner()); - self.parent_state.finish(starting_offset); + // Write field offsets (adjusted for header) + for (_, &relative_offset) in &self.fields { + let offset_bytes = relative_offset.to_le_bytes(); + buffer[header_pos..header_pos + offset_size as usize] + .copy_from_slice(&offset_bytes[..offset_size as usize]); + header_pos += offset_size as usize; + } + + // Write data_size + let data_size_bytes = data_size.to_le_bytes(); + buffer[header_pos..header_pos + offset_size as usize] + .copy_from_slice(&data_size_bytes[..offset_size as usize]); + + let start_offset_shift = self.parent_state.object_start_offset(); + self.parent_state + .finish(starting_offset - start_offset_shift); Review Comment: Why make the caller of `ParentState::finish` extract the start offset? Seems like parent state can do that more easily and reliably on its own? ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size + + let starting_offset = self.object_start_offset; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); Review Comment: aside: I'd be very impressed if the compiler is smart enough to optimize away the allocation in `vec![0u8; header_size].into_iter()`, which would potentially cause the above to run slower than the current code. ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size + + let starting_offset = self.object_start_offset; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); Review Comment: Alternatively -- that small allocation apparently isn't hurting performance. What if we create the temp vec with initial capacity of `header_size` and then populate it with the actual header info, field ids, and offsets before splicing it in. That way, an incorrect `header_size` calculation would not impact correctness: ```rust let mut bytes_to_splice = Vec::with_capacity(header_size); bytes_to_splice.push(header_byte); if is_large { bytes_to_splice.extend((num_fields as u32).to_le_bytes()); } else { bytes_to_splice.push(num_fields as u8); } for &field_id in self.fields.keys() { bytes_to_splice.extend(field_id.to_le_bytes().into_iter().take(id_size)); } for &offset in self.fields.values() { bytes_to_splice.extend((offset as u32).to_le_bytes().into_iter().take(offset_size)); } bytes_to_splice.extend((data_size as u32).to_le_bytes().into_iter().take(offset_size)); buffer.splice(starting_offset..starting_offset, bytes_to_splice); ``` ########## parquet-variant/src/builder.rs: ########## @@ -598,6 +599,49 @@ impl ParentState<'_> { } } } + + // returns the beginning offset of buffer for the parent if it is object builder, else 0. + // for object builder will reuse the buffer from the parent, this is needed for `finish` + // which needs the relative offset from the current variant. + fn object_start_offset(&self) -> usize { Review Comment: Doesn't variant array also have a starting offset? ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size Review Comment: One thing I don't love about this approach is the independent calculation of the splice size vs. the bytes that actually get inserted. If they ever disagreed... badness as the insertions underflow or overflow the splice. Ideally, we could produce an iterator that emits the desired bytes, and the splice itself can guarantee correct behavior. But for that to work the iterator would need to provide an accurate lower bound, so which rules out `std::iter::from_fn`. Even if we did craft a custom iterator, computing its lower bound would basically be this same calculation all over again. We could also chain together a bunch of iterators, which preserves the lower bound, but somehow I doubt that would be efficient. ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size + + let starting_offset = self.object_start_offset; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); + + // Write header at the original start position + let mut header_pos = starting_offset; + + // Write header byte let header = object_header(is_large, id_size, offset_size); - parent_buffer.append_header(header, is_large, num_fields); + buffer[header_pos] = header; + header_pos += 1; + + // Write number of fields + if is_large { + buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); + header_pos += 4; + } else { + buffer[header_pos] = num_fields as u8; + header_pos += 1; + } - // Write field IDs (sorted order) - let ids = self.fields.keys().map(|id| *id as usize); - parent_buffer.append_offset_array(ids, None, id_size); + // Write field IDs + for (&field_id, _) in &self.fields { Review Comment: ```suggestion for field_id in self.fields.keys() { ``` ########## parquet-variant/src/builder.rs: ########## @@ -1317,7 +1414,15 @@ impl<'a> ObjectBuilder<'a> { /// This is to ensure that the object is always finalized before its parent builder /// is finalized. impl Drop for ObjectBuilder<'_> { - fn drop(&mut self) {} + fn drop(&mut self) { + // truncate the buffer if the `finish` method has not been called. + if !self.has_been_finished { + self.parent_state + .buffer() + .inner_mut() + .truncate(self.object_start_offset); + } Review Comment: While we're at it, we should truncate the `MetadataBuilder` back to the size it had when we started, to clean up any new field ids the failed builder might have created. ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); Review Comment: aside: Rather than pay a traversal over the list of fields, I would just use the metadata builder's current length as a _very_ cheap to compute upper bound. It will almost always be a tight upper bound as well -- it would take a pretty carefully crafted object to use only the early/small field ids of a large dictionary, such that the dictionary length needs more encoding bytes than the largest field ids of this object. ########## parquet-variant/src/builder.rs: ########## @@ -598,6 +599,49 @@ impl ParentState<'_> { } } } + + // returns the beginning offset of buffer for the parent if it is object builder, else 0. + // for object builder will reuse the buffer from the parent, this is needed for `finish` + // which needs the relative offset from the current variant. + fn object_start_offset(&self) -> usize { + match self { + ParentState::Object { + object_start_offset, + .. + } => *object_start_offset, + _ => 0, + } + } + + /// Return mutable references to the buffer and metadata builder that this + /// parent state is using. + fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut MetadataBuilder) { + match self { + ParentState::Variant { + buffer, + metadata_builder, + } => (buffer, metadata_builder), + ParentState::List { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + ParentState::Object { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + } + } + + // return the offset of the underlying buffer at the time of calling this method. + fn buffer_current_offset(&self) -> usize { + match self { + ParentState::Variant { buffer, .. } => buffer.offset(), + ParentState::Object { buffer, .. } => buffer.offset(), + ParentState::List { buffer, .. } => buffer.offset(), + } Review Comment: As above, can reduce redundancy by ```suggestion match self { ParentState::Variant { buffer, .. } | ParentState::Object { buffer, .. } | ParentState::List { buffer, .. } => buffer.offset(), } ``` ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size + + let starting_offset = self.object_start_offset; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); Review Comment: Can avoid even this allocation: ```suggestion buffer.splice(starting_offset..starting_offset, std::iter::repeat_n(0u8, header_size)); ``` ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size Review Comment: ... but again, I worry that wrapping up all those for-loops inside an iterator for direct splicing will turn out to be a lot slower than the splice-then-overwrite approach the code currently takes. ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size Review Comment: Something like might _almost_ work? ```rust let field_ids = self.fields.keys().flat_map(|field_id| { (field_id as usize).to_le_bytes().into_iter().take(id_size) }); let offsets = self.fields.values().flat_map(|offset| { offset.to_le_bytes().into_iter().take(offset_size) }); let num_fields = num_fields.to_le_bytes().take(if is_large 4 else 1); let header_and_num_fields = std::iter::once(header_byte).chain(num_fields); let field_ids_and_offsets = field_ids.chain(offsets); let bytes_to_splice = header_and_num_fields.chain(field_ids_and_offsets); buffer.splice(starting_offset..starting_offset, bytes_to_splice); ``` ... but unfortunately `Iterator::flat_map` does _not_ (honestly, cannot) compute an accurate lower bound size hint. So the `splice` call would end up having to allocate an internal temp buffer. ########## parquet-variant/src/builder.rs: ########## @@ -598,6 +599,49 @@ impl ParentState<'_> { } } } + + // returns the beginning offset of buffer for the parent if it is object builder, else 0. + // for object builder will reuse the buffer from the parent, this is needed for `finish` + // which needs the relative offset from the current variant. + fn object_start_offset(&self) -> usize { + match self { + ParentState::Object { + object_start_offset, + .. + } => *object_start_offset, + _ => 0, + } + } + + /// Return mutable references to the buffer and metadata builder that this + /// parent state is using. + fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut MetadataBuilder) { + match self { + ParentState::Variant { + buffer, + metadata_builder, + } => (buffer, metadata_builder), + ParentState::List { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + ParentState::Object { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + } + } + + // return the offset of the underlying buffer at the time of calling this method. + fn buffer_current_offset(&self) -> usize { + match self { + ParentState::Variant { buffer, .. } => buffer.offset(), + ParentState::Object { buffer, .. } => buffer.offset(), + ParentState::List { buffer, .. } => buffer.offset(), + } Review Comment: Actually... we can just invoke `ParentState::buffer`? ```suggestion self.buffer().offset() ``` ########## parquet-variant/src/builder.rs: ########## @@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size Review Comment: I just realized -- a custom iterator that computes its size hint is still safer than the current code, because an incorrect size hint won't corrupt any data. It will just require extra allocations and/or byte shifting. There's still the question of performance tho. If it's drastically slower the extra safety probably wouldn't be worth it. Another possibility is to take a hybrid approach -- rely on `Iterator::chain` for most of the heavy lifting, but define a custom iterator for emitting offset/field_id arrays: ```rust let num_fields = num_fields.to_le_bytes().take(if is_large 4 else 1); let header_and_num_fields = std::iter::once(header_byte).chain(num_fields); let field_ids = PackedU32Iterator::new(id_size, self.fields.keys().copied()); let offsets = PackedU32Iterator::new(offset_size, self.fields.values().map(|offset| *offset as u32)); let field_ids_and_offsets = field_ids.chain(offsets); let bytes_to_splice = header_and_num_fields.chain(field_ids_and_offsets); buffer.splice(starting_offset..starting_offset, bytes_to_splice); ``` <details> <summary>PackedU32Iterator</summary> ```rust struct PackedU32Iterator<T: impl Iterator<Item = [u8; 4]>> { packed_bytes: usize, iterator: T, current_item: [u8; 4], current_byte: usize, // 0..3 } impl<T: impl Iterator<Item = [u8; 4]>> PackedU32Iterator<T> { fn new(packed_bytes: usize, iterator: T) -> Self { // eliminate corner cases in `next` by initializing with a fake already-consumed "first" item Self { packed_bytes, iterator, current_item: [0; 4], current_byte: packed_bytes, } } } impl<T: impl Iterator<Item = [u8; 4]>> Iterator for PackedU32Iterator<T> { fn size_hint(&self) -> (usize, Option<usize>) { let lower = (packed_bytes - current_byte) + packed_bytes * iterator.size_hint().0; (lower, None) } fn next(&mut self) -> Option<u8> { if self.current_byte >= self.packed_bytes { let Some(next_item) = self.iterator.next() else { return None; }; self.current_item = next_item; self.current_byte = 0; } let rval = self.current_item[self.current_byte]; self.current_byte += 1; Some(rval) } } ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org