etseidl commented on code in PR #9521:
URL: https://github.com/apache/arrow-rs/pull/9521#discussion_r2913796417


##########
parquet/src/column/writer/mod.rs:
##########
@@ -1048,40 +1056,50 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
         let compressed_page = match self.props.writer_version() {
             WriterVersion::PARQUET_1_0 => {
-                let mut buffer = vec![];
-
-                if max_rep_level > 0 {
-                    buffer.extend_from_slice(
-                        &self.encode_levels_v1(
-                            Encoding::RLE,
-                            &self.rep_levels_sink[..],
-                            max_rep_level,
-                        )[..],
-                    );
-                }
+                // Encode levels into locals first to avoid borrow conflict

Review Comment:
   Here we need to allocate up to 2 temp buffers to store the level data. In 
the original there are still 2 allocations, but there's a chance that the 
second allocation will reuse memory just freed up from the first.



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1048,40 +1056,50 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
         let compressed_page = match self.props.writer_version() {
             WriterVersion::PARQUET_1_0 => {
-                let mut buffer = vec![];
-
-                if max_rep_level > 0 {
-                    buffer.extend_from_slice(
-                        &self.encode_levels_v1(
-                            Encoding::RLE,
-                            &self.rep_levels_sink[..],
-                            max_rep_level,
-                        )[..],
-                    );
-                }
+                // Encode levels into locals first to avoid borrow conflict
+                // (encode_levels_v1 borrows &self, page_buf needs &mut self)
+                let rep_levels_encoded = if max_rep_level > 0 {
+                    Some(self.encode_levels_v1(
+                        Encoding::RLE,
+                        &self.rep_levels_sink[..],
+                        max_rep_level,
+                    ))
+                } else {
+                    None
+                };
+                let def_levels_encoded = if max_def_level > 0 {
+                    Some(self.encode_levels_v1(
+                        Encoding::RLE,
+                        &self.def_levels_sink[..],
+                        max_def_level,
+                    ))
+                } else {
+                    None
+                };
 
-                if max_def_level > 0 {
-                    buffer.extend_from_slice(
-                        &self.encode_levels_v1(
-                            Encoding::RLE,
-                            &self.def_levels_sink[..],
-                            max_def_level,
-                        )[..],
-                    );
+                // Assemble page data into reusable buffer
+                self.page_buf.clear();
+                if let Some(ref levels) = rep_levels_encoded {
+                    self.page_buf.extend_from_slice(levels);
                 }
-
-                buffer.extend_from_slice(&values_data.buf);
-                let uncompressed_size = buffer.len();
-
-                if let Some(ref mut cmpr) = self.compressor {
-                    let mut compressed_buf = 
Vec::with_capacity(uncompressed_size);
-                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
-                    compressed_buf.shrink_to_fit();
-                    buffer = compressed_buf;
+                if let Some(ref levels) = def_levels_encoded {
+                    self.page_buf.extend_from_slice(levels);
                 }
+                self.page_buf.extend_from_slice(&values_data.buf);
+                let uncompressed_size = self.page_buf.len();
+
+                let page_bytes = if let Some(ref mut cmpr) = self.compressor {
+                    self.compressed_buf.clear();

Review Comment:
   This is the only clear win I see here. We're replacing up to 2 allocations 
with 1. For poorly compressed data it's possible the `shrink_to_fit` will do 
nothing.



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1114,24 +1142,32 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                 // Data Page v2 compresses values only.
                 let is_compressed = match self.compressor {
                     Some(ref mut cmpr) => {
-                        let buffer_len = buffer.len();
-                        cmpr.compress(&values_data.buf, &mut buffer)?;
-                        if uncompressed_size <= buffer.len() - buffer_len {
-                            buffer.truncate(buffer_len);
-                            buffer.extend_from_slice(&values_data.buf);
+                        let buffer_len = self.page_buf.len();
+                        cmpr.compress(&values_data.buf, &mut self.page_buf)?;
+                        if uncompressed_size <= self.page_buf.len() - 
buffer_len {
+                            self.page_buf.truncate(buffer_len);
+                            self.page_buf.extend_from_slice(&values_data.buf);
                             false
                         } else {
                             true
                         }
                     }
                     None => {
-                        buffer.extend_from_slice(&values_data.buf);
+                        self.page_buf.extend_from_slice(&values_data.buf);
                         false
                     }
                 };
 
+                let page_bytes = if is_compressed {

Review Comment:
   For V2 pages, there's really no win here. Uncompressed still does one extra 
alloc, and compressed trades one allocation of the tmp buffer for one for the 
`copy_from_slice`.



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1048,40 +1056,50 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
         let compressed_page = match self.props.writer_version() {
             WriterVersion::PARQUET_1_0 => {
-                let mut buffer = vec![];
-
-                if max_rep_level > 0 {
-                    buffer.extend_from_slice(
-                        &self.encode_levels_v1(
-                            Encoding::RLE,
-                            &self.rep_levels_sink[..],
-                            max_rep_level,
-                        )[..],
-                    );
-                }
+                // Encode levels into locals first to avoid borrow conflict
+                // (encode_levels_v1 borrows &self, page_buf needs &mut self)
+                let rep_levels_encoded = if max_rep_level > 0 {
+                    Some(self.encode_levels_v1(
+                        Encoding::RLE,
+                        &self.rep_levels_sink[..],
+                        max_rep_level,
+                    ))
+                } else {
+                    None
+                };
+                let def_levels_encoded = if max_def_level > 0 {
+                    Some(self.encode_levels_v1(
+                        Encoding::RLE,
+                        &self.def_levels_sink[..],
+                        max_def_level,
+                    ))
+                } else {
+                    None
+                };
 
-                if max_def_level > 0 {
-                    buffer.extend_from_slice(
-                        &self.encode_levels_v1(
-                            Encoding::RLE,
-                            &self.def_levels_sink[..],
-                            max_def_level,
-                        )[..],
-                    );
+                // Assemble page data into reusable buffer
+                self.page_buf.clear();
+                if let Some(ref levels) = rep_levels_encoded {
+                    self.page_buf.extend_from_slice(levels);
                 }
-
-                buffer.extend_from_slice(&values_data.buf);
-                let uncompressed_size = buffer.len();
-
-                if let Some(ref mut cmpr) = self.compressor {
-                    let mut compressed_buf = 
Vec::with_capacity(uncompressed_size);
-                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
-                    compressed_buf.shrink_to_fit();
-                    buffer = compressed_buf;
+                if let Some(ref levels) = def_levels_encoded {
+                    self.page_buf.extend_from_slice(levels);
                 }
+                self.page_buf.extend_from_slice(&values_data.buf);
+                let uncompressed_size = self.page_buf.len();
+
+                let page_bytes = if let Some(ref mut cmpr) = self.compressor {
+                    self.compressed_buf.clear();
+                    cmpr.compress(&self.page_buf[..], &mut 
self.compressed_buf)?;
+                    Bytes::copy_from_slice(&self.compressed_buf)
+                } else {
+                    // Zero-cost ownership transfer instead of memcpy.
+                    // page_buf will regrow on next page (one alloc, same as 
pre-reuse code).
+                    Bytes::from(std::mem::take(&mut self.page_buf))

Review Comment:
   So for N uncompressed pages, this will actually do N+1 empty vec 
allocations, and the same for V2 pages.



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1114,24 +1142,32 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                 // Data Page v2 compresses values only.
                 let is_compressed = match self.compressor {
                     Some(ref mut cmpr) => {
-                        let buffer_len = buffer.len();
-                        cmpr.compress(&values_data.buf, &mut buffer)?;
-                        if uncompressed_size <= buffer.len() - buffer_len {
-                            buffer.truncate(buffer_len);
-                            buffer.extend_from_slice(&values_data.buf);
+                        let buffer_len = self.page_buf.len();
+                        cmpr.compress(&values_data.buf, &mut self.page_buf)?;
+                        if uncompressed_size <= self.page_buf.len() - 
buffer_len {
+                            self.page_buf.truncate(buffer_len);
+                            self.page_buf.extend_from_slice(&values_data.buf);
                             false
                         } else {
                             true
                         }
                     }
                     None => {
-                        buffer.extend_from_slice(&values_data.buf);
+                        self.page_buf.extend_from_slice(&values_data.buf);
                         false
                     }
                 };
 
+                let page_bytes = if is_compressed {
+                    // Compressed: copy smaller data, reuse buffer for next 
page
+                    Bytes::copy_from_slice(&self.page_buf)

Review Comment:
   If we're doing an alloc+copy, why not just do 
`Bytes::from(std::mem::take(&mut self.page_buf))` for both cases (or copy for 
both)?
   
   Also, the compression buffer is never used for V2 pages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to