This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b9a41f3ff Use bytes in parquet (#1474) (#1683)
b9a41f3ff is described below

commit b9a41f3ff64c624ecc8b0c6045095becba02f370
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 11 17:54:07 2022 +0100

    Use bytes in parquet (#1474) (#1683)
---
 parquet/Cargo.toml                          |   1 +
 parquet/src/arrow/array_reader.rs           |   4 +-
 parquet/src/arrow/array_reader/test_util.rs |   9 +-
 parquet/src/column/reader.rs                |  16 +-
 parquet/src/column/writer.rs                |   8 +-
 parquet/src/data_type.rs                    |  13 +-
 parquet/src/encodings/decoding.rs           |  11 +-
 parquet/src/encodings/encoding.rs           |  83 ++---
 parquet/src/encodings/levels.rs             |   2 +-
 parquet/src/util/memory.rs                  | 479 ++--------------------------
 parquet/src/util/test_common/page_util.rs   |  10 +-
 11 files changed, 91 insertions(+), 545 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 265f95197..94ed8bbc3 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -31,6 +31,7 @@ rust-version = "1.57"
 
 [dependencies]
 parquet-format = "4.0.0"
+bytes = "1.1"
 byteorder = "1"
 thrift = "0.13"
 snap = { version = "1.0", optional = true }
diff --git a/parquet/src/arrow/array_reader.rs 
b/parquet/src/arrow/array_reader.rs
index 1d8441cbd..d2250f8ef 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -1381,7 +1381,6 @@ mod tests {
     #[test]
     fn test_complex_array_reader_dict_enc_string() {
         use crate::encodings::encoding::{DictEncoder, Encoder};
-        use crate::util::memory::MemTracker;
         // Construct column schema
         let message_type = "
         message test_schema {
@@ -1412,9 +1411,8 @@ mod tests {
         let mut all_values = Vec::with_capacity(num_pages * values_per_page);
 
         for i in 0..num_pages {
-            let mem_tracker = Arc::new(MemTracker::new());
             let mut dict_encoder =
-                DictEncoder::<ByteArrayType>::new(column_desc.clone(), 
mem_tracker);
+                DictEncoder::<ByteArrayType>::new(column_desc.clone());
             // add data page
             let mut values = Vec::with_capacity(values_per_page);
 
diff --git a/parquet/src/arrow/array_reader/test_util.rs 
b/parquet/src/arrow/array_reader/test_util.rs
index afee4659c..0c044eb2d 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -29,7 +29,7 @@ use crate::errors::Result;
 use crate::schema::types::{
     ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type,
 };
-use crate::util::memory::{ByteBufferPtr, MemTracker};
+use crate::util::memory::ByteBufferPtr;
 
 /// Returns a descriptor for a UTF-8 column
 pub fn utf8_column() -> ColumnDescPtr {
@@ -49,9 +49,7 @@ pub fn utf8_column() -> ColumnDescPtr {
 /// Encode `data` with the provided `encoding`
 pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> 
ByteBufferPtr {
     let descriptor = utf8_column();
-    let mem_tracker = Arc::new(MemTracker::new());
-    let mut encoder =
-        get_encoder::<ByteArrayType>(descriptor, encoding, 
mem_tracker).unwrap();
+    let mut encoder = get_encoder::<ByteArrayType>(descriptor, 
encoding).unwrap();
 
     encoder.put(data).unwrap();
     encoder.flush_buffer().unwrap()
@@ -59,8 +57,7 @@ pub fn encode_byte_array(encoding: Encoding, data: 
&[ByteArray]) -> ByteBufferPt
 
 /// Returns the encoded dictionary and value data
 pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) 
{
-    let mut dict_encoder =
-        DictEncoder::<ByteArrayType>::new(utf8_column(), 
Arc::new(MemTracker::new()));
+    let mut dict_encoder = DictEncoder::<ByteArrayType>::new(utf8_column());
 
     dict_encoder.put(data).unwrap();
     let encoded_rle = dict_encoder.flush_buffer().unwrap();
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 1fc722f29..3a45ecf3f 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -338,13 +338,13 @@ where
                             let mut offset = 0;
 
                             if max_rep_level > 0 {
-                                let level_data = parse_v1_level(
+                                let (bytes_read, level_data) = parse_v1_level(
                                     max_rep_level,
                                     num_values,
                                     rep_level_encoding,
                                     buf.start_from(offset),
                                 )?;
-                                offset = level_data.end();
+                                offset += bytes_read;
 
                                 let decoder =
                                     R::new(max_rep_level, rep_level_encoding, 
level_data);
@@ -353,13 +353,13 @@ where
                             }
 
                             if max_def_level > 0 {
-                                let level_data = parse_v1_level(
+                                let (bytes_read, level_data) = parse_v1_level(
                                     max_def_level,
                                     num_values,
                                     def_level_encoding,
                                     buf.start_from(offset),
                                 )?;
-                                offset = level_data.end();
+                                offset += bytes_read;
 
                                 let decoder =
                                     D::new(max_def_level, def_level_encoding, 
level_data);
@@ -460,20 +460,20 @@ fn parse_v1_level(
     num_buffered_values: u32,
     encoding: Encoding,
     buf: ByteBufferPtr,
-) -> Result<ByteBufferPtr> {
+) -> Result<(usize, ByteBufferPtr)> {
     match encoding {
         Encoding::RLE => {
             let i32_size = std::mem::size_of::<i32>();
             let data_size = read_num_bytes!(i32, i32_size, buf.as_ref()) as 
usize;
-            Ok(buf.range(i32_size, data_size))
+            Ok((i32_size + data_size, buf.range(i32_size, data_size)))
         }
         Encoding::BIT_PACKED => {
             let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) 
as u8;
             let num_bytes = ceil(
                 (num_buffered_values as usize * bit_width as usize) as i64,
                 8,
-            );
-            Ok(buf.range(0, num_bytes as usize))
+            ) as usize;
+            Ok((num_bytes, buf.range(0, num_bytes)))
         }
         _ => Err(general_err!("invalid level encoding: {}", encoding)),
     }
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 13ea85157..a7d0ba8fc 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 //! Contains column writer API.
-use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, 
sync::Arc};
+use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};
 
 use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, 
PageType, Type};
 use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
@@ -36,7 +36,7 @@ use crate::file::{
 };
 use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::FromBytes;
-use crate::util::memory::{ByteBufferPtr, MemTracker};
+use crate::util::memory::ByteBufferPtr;
 
 /// Column writer for a Parquet type.
 pub enum ColumnWriter {
@@ -213,7 +213,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
         let dict_encoder = if props.dictionary_enabled(descr.path())
             && has_dictionary_support(T::get_physical_type(), &props)
         {
-            Some(DictEncoder::new(descr.clone(), Arc::new(MemTracker::new())))
+            Some(DictEncoder::new(descr.clone()))
         } else {
             None
         };
@@ -227,7 +227,6 @@ impl<T: DataType> ColumnWriterImpl<T> {
             props
                 .encoding(descr.path())
                 .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), 
&props)),
-            Arc::new(MemTracker::new()),
         )
         .unwrap();
 
@@ -1135,6 +1134,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: 
&[u8]) -> bool {
 #[cfg(test)]
 mod tests {
     use rand::distributions::uniform::SampleUniform;
+    use std::sync::Arc;
 
     use crate::column::{
         page::PageReader,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index ae1e01365..28645a262 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -31,7 +31,7 @@ use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
 use crate::errors::{ParquetError, Result};
 use crate::util::{
     bit_util::{from_ne_slice, FromBytes},
-    memory::{ByteBuffer, ByteBufferPtr},
+    memory::ByteBufferPtr,
 };
 
 /// Rust representation for logical type INT96, value is backed by an array of 
`u32`.
@@ -217,14 +217,6 @@ impl From<ByteBufferPtr> for ByteArray {
     }
 }
 
-impl From<ByteBuffer> for ByteArray {
-    fn from(mut buf: ByteBuffer) -> ByteArray {
-        Self {
-            data: Some(buf.consume()),
-        }
-    }
-}
-
 impl PartialEq for ByteArray {
     fn eq(&self, other: &ByteArray) -> bool {
         match (&self.data, &other.data) {
@@ -1322,8 +1314,7 @@ mod tests {
             ByteArray::from(ByteBufferPtr::new(vec![1u8, 2u8, 3u8, 4u8, 
5u8])).data(),
             &[1u8, 2u8, 3u8, 4u8, 5u8]
         );
-        let mut buf = ByteBuffer::new();
-        buf.set_data(vec![6u8, 7u8, 8u8, 9u8, 10u8]);
+        let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
         assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
     }
 
diff --git a/parquet/src/encodings/decoding.rs 
b/parquet/src/encodings/decoding.rs
index 24e0c962e..7c95d5532 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -936,11 +936,7 @@ mod tests {
     use crate::schema::types::{
         ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
     };
-    use crate::util::{
-        bit_util::set_array_bit,
-        memory::{BufferPtr, MemTracker},
-        test_common::RandGen,
-    };
+    use crate::util::{bit_util::set_array_bit, test_common::RandGen};
 
     #[test]
     fn test_get_decoders() {
@@ -1389,7 +1385,7 @@ mod tests {
 
         let length = data.len();
 
-        let ptr = BufferPtr::new(data);
+        let ptr = ByteBufferPtr::new(data);
         let mut reader = BitReader::new(ptr.clone());
         assert_eq!(reader.get_vlq_int().unwrap(), 256);
         assert_eq!(reader.get_vlq_int().unwrap(), 4);
@@ -1472,8 +1468,7 @@ mod tests {
 
         // Encode data
         let mut encoder =
-            get_encoder::<T>(col_descr.clone(), encoding, 
Arc::new(MemTracker::new()))
-                .expect("get encoder");
+            get_encoder::<T>(col_descr.clone(), encoding).expect("get 
encoder");
 
         for v in &data[..] {
             encoder.put(&v[..]).expect("ok to encode");
diff --git a/parquet/src/encodings/encoding.rs 
b/parquet/src/encodings/encoding.rs
index f4f304625..4b9cf2e9b 100644
--- a/parquet/src/encodings/encoding.rs
+++ b/parquet/src/encodings/encoding.rs
@@ -28,7 +28,7 @@ use crate::schema::types::ColumnDescPtr;
 use crate::util::{
     bit_util::{self, log2, num_required_bits, BitWriter},
     hash_util,
-    memory::{Buffer, ByteBuffer, ByteBufferPtr, MemTrackerPtr},
+    memory::ByteBufferPtr,
 };
 
 // ----------------------------------------------------------------------
@@ -76,10 +76,9 @@ pub trait Encoder<T: DataType> {
 pub fn get_encoder<T: DataType>(
     desc: ColumnDescPtr,
     encoding: Encoding,
-    mem_tracker: MemTrackerPtr,
 ) -> Result<Box<dyn Encoder<T>>> {
     let encoder: Box<dyn Encoder<T>> = match encoding {
-        Encoding::PLAIN => Box::new(PlainEncoder::new(desc, mem_tracker, 
vec![])),
+        Encoding::PLAIN => Box::new(PlainEncoder::new(desc, vec![])),
         Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
             return Err(general_err!(
                 "Cannot initialize this encoding through this function"
@@ -109,7 +108,7 @@ pub fn get_encoder<T: DataType>(
 /// - BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
 /// - FIXED_LEN_BYTE_ARRAY - just the bytes are stored.
 pub struct PlainEncoder<T: DataType> {
-    buffer: ByteBuffer,
+    buffer: Vec<u8>,
     bit_writer: BitWriter,
     desc: ColumnDescPtr,
     _phantom: PhantomData<T>,
@@ -117,11 +116,9 @@ pub struct PlainEncoder<T: DataType> {
 
 impl<T: DataType> PlainEncoder<T> {
     /// Creates new plain encoder.
-    pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr, vec: Vec<u8>) 
-> Self {
-        let mut byte_buffer = ByteBuffer::new().with_mem_tracker(mem_tracker);
-        byte_buffer.set_data(vec);
+    pub fn new(desc: ColumnDescPtr, buffer: Vec<u8>) -> Self {
         Self {
-            buffer: byte_buffer,
+            buffer,
             bit_writer: BitWriter::new(256),
             desc,
             _phantom: PhantomData,
@@ -139,16 +136,15 @@ impl<T: DataType> Encoder<T> for PlainEncoder<T> {
     }
 
     fn estimated_data_encoded_size(&self) -> usize {
-        self.buffer.size() + self.bit_writer.bytes_written()
+        self.buffer.len() + self.bit_writer.bytes_written()
     }
 
     #[inline]
     fn flush_buffer(&mut self) -> Result<ByteBufferPtr> {
-        self.buffer.write_all(self.bit_writer.flush_buffer())?;
-        self.buffer.flush()?;
+        self.buffer
+            .extend_from_slice(self.bit_writer.flush_buffer());
         self.bit_writer.clear();
-
-        Ok(self.buffer.consume())
+        Ok(std::mem::take(&mut self.buffer).into())
     }
 
     #[inline]
@@ -189,35 +185,31 @@ pub struct DictEncoder<T: DataType> {
     // Stores indices which map (many-to-one) to the values in the `uniques` 
array.
     // Here we are using fix-sized array with linear probing.
     // A slot with `HASH_SLOT_EMPTY` indicates the slot is not currently 
occupied.
-    hash_slots: Buffer<i32>,
+    hash_slots: Vec<i32>,
 
     // Indices that have not yet be written out by `write_indices()`.
-    buffered_indices: Buffer<i32>,
+    buffered_indices: Vec<i32>,
 
     // The unique observed values.
-    uniques: Buffer<T::T>,
+    uniques: Vec<T::T>,
 
     // Size in bytes needed to encode this dictionary.
     uniques_size_in_bytes: usize,
-
-    // Tracking memory usage for the various data structures in this struct.
-    mem_tracker: MemTrackerPtr,
 }
 
 impl<T: DataType> DictEncoder<T> {
     /// Creates new dictionary encoder.
-    pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr) -> Self {
-        let mut slots = Buffer::new().with_mem_tracker(mem_tracker.clone());
+    pub fn new(desc: ColumnDescPtr) -> Self {
+        let mut slots = vec![];
         slots.resize(INITIAL_HASH_TABLE_SIZE, -1);
         Self {
             desc,
             hash_table_size: INITIAL_HASH_TABLE_SIZE,
             mod_bitmask: (INITIAL_HASH_TABLE_SIZE - 1) as u32,
             hash_slots: slots,
-            buffered_indices: 
Buffer::new().with_mem_tracker(mem_tracker.clone()),
-            uniques: Buffer::new().with_mem_tracker(mem_tracker.clone()),
+            buffered_indices: vec![],
+            uniques: vec![],
             uniques_size_in_bytes: 0,
-            mem_tracker,
         }
     }
 
@@ -230,7 +222,7 @@ impl<T: DataType> DictEncoder<T> {
 
     /// Returns number of unique values (keys) in the dictionary.
     pub fn num_entries(&self) -> usize {
-        self.uniques.size()
+        self.uniques.len()
     }
 
     /// Returns size of unique values (keys) in the dictionary, in bytes.
@@ -242,9 +234,8 @@ impl<T: DataType> DictEncoder<T> {
     /// the result.
     #[inline]
     pub fn write_dict(&self) -> Result<ByteBufferPtr> {
-        let mut plain_encoder =
-            PlainEncoder::<T>::new(self.desc.clone(), 
self.mem_tracker.clone(), vec![]);
-        plain_encoder.put(self.uniques.data())?;
+        let mut plain_encoder = PlainEncoder::<T>::new(self.desc.clone(), 
vec![]);
+        plain_encoder.put(&self.uniques)?;
         plain_encoder.flush_buffer()
     }
 
@@ -255,12 +246,11 @@ impl<T: DataType> DictEncoder<T> {
         let buffer_len = self.estimated_data_encoded_size();
         let mut buffer: Vec<u8> = vec![0; buffer_len as usize];
         buffer[0] = self.bit_width() as u8;
-        self.mem_tracker.alloc(buffer.capacity() as i64);
 
         // Write bit width in the first byte
         buffer.write_all((self.bit_width() as u8).as_bytes())?;
         let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 
1);
-        for index in self.buffered_indices.data() {
+        for index in &self.buffered_indices {
             if !encoder.put(*index as u64)? {
                 return Err(general_err!("Encoder doesn't have enough space"));
             }
@@ -293,7 +283,7 @@ impl<T: DataType> DictEncoder<T> {
 
     #[inline(never)]
     fn insert_fresh_slot(&mut self, slot: usize, value: T::T) -> i32 {
-        let index = self.uniques.size() as i32;
+        let index = self.uniques.len() as i32;
         self.hash_slots[slot] = index;
 
         let (base_size, num_elements) = value.dict_encoding_size();
@@ -307,7 +297,7 @@ impl<T: DataType> DictEncoder<T> {
         self.uniques_size_in_bytes += unique_size;
         self.uniques.push(value);
 
-        if self.uniques.size() > (self.hash_table_size as f32 * MAX_HASH_LOAD) 
as usize {
+        if self.uniques.len() > (self.hash_table_size as f32 * MAX_HASH_LOAD) 
as usize {
             self.double_table_size();
         }
 
@@ -316,7 +306,7 @@ impl<T: DataType> DictEncoder<T> {
 
     #[inline]
     fn bit_width(&self) -> u8 {
-        let num_entries = self.uniques.size();
+        let num_entries = self.uniques.len();
         if num_entries == 0 {
             0
         } else if num_entries == 1 {
@@ -328,7 +318,7 @@ impl<T: DataType> DictEncoder<T> {
 
     fn double_table_size(&mut self) {
         let new_size = self.hash_table_size * 2;
-        let mut new_hash_slots = 
Buffer::new().with_mem_tracker(self.mem_tracker.clone());
+        let mut new_hash_slots = vec![];
         new_hash_slots.resize(new_size, HASH_SLOT_EMPTY);
         for i in 0..self.hash_table_size {
             let index = self.hash_slots[i];
@@ -376,7 +366,7 @@ impl<T: DataType> Encoder<T> for DictEncoder<T> {
     fn estimated_data_encoded_size(&self) -> usize {
         let bit_width = self.bit_width();
         1 + RleEncoder::min_buffer_size(bit_width)
-            + RleEncoder::max_buffer_size(bit_width, 
self.buffered_indices.size())
+            + RleEncoder::max_buffer_size(bit_width, 
self.buffered_indices.len())
     }
 
     #[inline]
@@ -677,10 +667,9 @@ impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
         // Write page header with total values
         self.write_page_header();
 
-        let mut buffer = ByteBuffer::new();
-        buffer.write_all(self.page_header_writer.flush_buffer())?;
-        buffer.write_all(self.bit_writer.flush_buffer())?;
-        buffer.flush()?;
+        let mut buffer = Vec::new();
+        buffer.extend_from_slice(self.page_header_writer.flush_buffer());
+        buffer.extend_from_slice(self.bit_writer.flush_buffer());
 
         // Reset state
         self.page_header_writer.clear();
@@ -690,7 +679,7 @@ impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
         self.current_value = 0;
         self.values_in_block = 0;
 
-        Ok(buffer.consume())
+        Ok(buffer.into())
     }
 }
 
@@ -933,10 +922,7 @@ mod tests {
     use crate::schema::types::{
         ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
     };
-    use crate::util::{
-        memory::MemTracker,
-        test_common::{random_bytes, RandGen},
-    };
+    use crate::util::test_common::{random_bytes, RandGen};
 
     const TEST_SET_SIZE: usize = 1024;
 
@@ -1286,8 +1272,7 @@ mod tests {
         err: Option<ParquetError>,
     ) {
         let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
-        let mem_tracker = Arc::new(MemTracker::new());
-        let encoder = get_encoder::<T>(descr, encoding, mem_tracker);
+        let encoder = get_encoder::<T>(descr, encoding);
         match err {
             Some(parquet_error) => {
                 assert!(encoder.is_err());
@@ -1319,8 +1304,7 @@ mod tests {
         enc: Encoding,
     ) -> Box<dyn Encoder<T>> {
         let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
-        let mem_tracker = Arc::new(MemTracker::new());
-        get_encoder(desc, enc, mem_tracker).unwrap()
+        get_encoder(desc, enc).unwrap()
     }
 
     fn create_test_decoder<T: DataType>(
@@ -1333,8 +1317,7 @@ mod tests {
 
     fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> {
         let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
-        let mem_tracker = Arc::new(MemTracker::new());
-        DictEncoder::<T>::new(desc, mem_tracker)
+        DictEncoder::<T>::new(desc)
     }
 
     fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> {
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index deabbd440..c8682e06d 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -207,7 +207,7 @@ impl LevelDecoder {
                 let num_bytes =
                     ceil((num_buffered_values * bit_width as usize) as i64, 8);
                 let data_size = cmp::min(num_bytes as usize, data.len());
-                decoder.reset(data.range(data.start(), data_size));
+                decoder.reset(data.range(0, data_size));
                 data_size
             }
             _ => panic!(),
diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs
index 923c45db1..0b0c707ff 100644
--- a/parquet/src/util/memory.rs
+++ b/parquet/src/util/memory.rs
@@ -17,403 +17,95 @@
 
 //! Utility methods and structs for working with memory.
 
+use bytes::Bytes;
 use std::{
     fmt::{Debug, Display, Formatter, Result as FmtResult},
-    io::{Result as IoResult, Write},
-    mem,
-    ops::{Index, IndexMut},
-    sync::{
-        atomic::{AtomicI64, Ordering},
-        Arc, Weak,
-    },
+    ops::Index,
 };
 
-// ----------------------------------------------------------------------
-// Memory Tracker classes
-
-/// Reference counted pointer for [`MemTracker`].
-pub type MemTrackerPtr = Arc<MemTracker>;
-/// Non-owning reference for [`MemTracker`].
-pub type WeakMemTrackerPtr = Weak<MemTracker>;
-
-/// Struct to track memory usage information.
-#[derive(Debug)]
-pub struct MemTracker {
-    // In the tuple, the first element is the current memory allocated (in 
bytes),
-    // and the second element is the maximum memory allocated so far (in 
bytes).
-    current_memory_usage: AtomicI64,
-    max_memory_usage: AtomicI64,
-}
-
-impl MemTracker {
-    /// Creates new memory tracker.
-    #[inline]
-    pub fn new() -> MemTracker {
-        MemTracker {
-            current_memory_usage: Default::default(),
-            max_memory_usage: Default::default(),
-        }
-    }
-
-    /// Returns the current memory consumption, in bytes.
-    pub fn memory_usage(&self) -> i64 {
-        self.current_memory_usage.load(Ordering::Acquire)
-    }
-
-    /// Returns the maximum memory consumption so far, in bytes.
-    pub fn max_memory_usage(&self) -> i64 {
-        self.max_memory_usage.load(Ordering::Acquire)
-    }
-
-    /// Adds `num_bytes` to the memory consumption tracked by this memory 
tracker.
-    #[inline]
-    pub fn alloc(&self, num_bytes: i64) {
-        let new_current = self
-            .current_memory_usage
-            .fetch_add(num_bytes, Ordering::Acquire)
-            + num_bytes;
-        self.max_memory_usage
-            .fetch_max(new_current, Ordering::Acquire);
-    }
-}
-
-// ----------------------------------------------------------------------
-// Buffer classes
-
-/// Type alias for [`Buffer`].
-pub type ByteBuffer = Buffer<u8>;
-/// Type alias for [`BufferPtr`].
-pub type ByteBufferPtr = BufferPtr<u8>;
-
-/// A resize-able buffer class with generic member, with optional memory 
tracker.
-///
-/// Note that a buffer has two attributes:
-/// `capacity` and `size`: the former is the total number of space reserved for
-/// the buffer, while the latter is the actual number of elements.
-/// Invariant: `capacity` >= `size`.
-/// The total allocated bytes for a buffer equals to `capacity * sizeof<T>()`.
-pub struct Buffer<T: Clone> {
-    data: Vec<T>,
-    mem_tracker: Option<MemTrackerPtr>,
-    type_length: usize,
-}
-
-impl<T: Clone> Buffer<T> {
-    /// Creates new empty buffer.
-    pub fn new() -> Self {
-        Buffer {
-            data: vec![],
-            mem_tracker: None,
-            type_length: std::mem::size_of::<T>(),
-        }
-    }
-
-    /// Adds [`MemTracker`] for this buffer.
-    #[inline]
-    pub fn with_mem_tracker(mut self, mc: MemTrackerPtr) -> Self {
-        mc.alloc((self.data.capacity() * self.type_length) as i64);
-        self.mem_tracker = Some(mc);
-        self
-    }
-
-    /// Returns slice of data in this buffer.
-    #[inline]
-    pub fn data(&self) -> &[T] {
-        self.data.as_slice()
-    }
-
-    /// Sets data for this buffer.
-    #[inline]
-    pub fn set_data(&mut self, new_data: Vec<T>) {
-        if let Some(ref mc) = self.mem_tracker {
-            let capacity_diff = new_data.capacity() as i64 - 
self.data.capacity() as i64;
-            mc.alloc(capacity_diff * self.type_length as i64);
-        }
-        self.data = new_data;
-    }
-
-    /// Resizes underlying data in place to a new length `new_size`.
-    ///
-    /// If `new_size` is less than current length, data is truncated, 
otherwise, it is
-    /// extended to `new_size` with provided default value `init_value`.
-    ///
-    /// Memory tracker is also updated, if available.
-    #[inline]
-    pub fn resize(&mut self, new_size: usize, init_value: T) {
-        let old_capacity = self.data.capacity();
-        self.data.resize(new_size, init_value);
-        if let Some(ref mc) = self.mem_tracker {
-            let capacity_diff = self.data.capacity() as i64 - old_capacity as 
i64;
-            mc.alloc(capacity_diff * self.type_length as i64);
-        }
-    }
-
-    /// Clears underlying data.
-    #[inline]
-    pub fn clear(&mut self) {
-        self.data.clear()
-    }
-
-    /// Reserves capacity `additional_capacity` for underlying data vector.
-    ///
-    /// Memory tracker is also updated, if available.
-    #[inline]
-    pub fn reserve(&mut self, additional_capacity: usize) {
-        let old_capacity = self.data.capacity();
-        self.data.reserve(additional_capacity);
-        if self.data.capacity() > old_capacity {
-            if let Some(ref mc) = self.mem_tracker {
-                let capacity_diff = self.data.capacity() as i64 - old_capacity 
as i64;
-                mc.alloc(capacity_diff * self.type_length as i64);
-            }
-        }
-    }
-
-    /// Returns [`BufferPtr`] with buffer data.
-    /// Buffer data is reset.
-    #[inline]
-    pub fn consume(&mut self) -> BufferPtr<T> {
-        let old_data = mem::take(&mut self.data);
-        let mut result = BufferPtr::new(old_data);
-        if let Some(ref mc) = self.mem_tracker {
-            result = result.with_mem_tracker(mc.clone());
-        }
-        result
-    }
-
-    /// Adds `value` to the buffer.
-    #[inline]
-    pub fn push(&mut self, value: T) {
-        self.data.push(value)
-    }
-
-    /// Returns current capacity for the buffer.
-    #[inline]
-    pub fn capacity(&self) -> usize {
-        self.data.capacity()
-    }
-
-    /// Returns current size for the buffer.
-    #[inline]
-    pub fn size(&self) -> usize {
-        self.data.len()
-    }
-
-    /// Returns `true` if memory tracker is added to buffer, `false` otherwise.
-    #[inline]
-    pub fn is_mem_tracked(&self) -> bool {
-        self.mem_tracker.is_some()
-    }
-
-    /// Returns memory tracker associated with this buffer.
-    /// This may panic, if memory tracker is not set, use method above to 
check if
-    /// memory tracker is available.
-    #[inline]
-    pub fn mem_tracker(&self) -> &MemTrackerPtr {
-        self.mem_tracker.as_ref().unwrap()
-    }
-}
-
-impl<T: Sized + Clone> Index<usize> for Buffer<T> {
-    type Output = T;
-
-    fn index(&self, index: usize) -> &T {
-        &self.data[index]
-    }
-}
-
-impl<T: Sized + Clone> IndexMut<usize> for Buffer<T> {
-    fn index_mut(&mut self, index: usize) -> &mut T {
-        &mut self.data[index]
-    }
-}
-
-// TODO: implement this for other types
-impl Write for Buffer<u8> {
-    #[inline]
-    fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
-        let old_capacity = self.data.capacity();
-        let bytes_written = self.data.write(buf)?;
-        if let Some(ref mc) = self.mem_tracker {
-            if self.data.capacity() - old_capacity > 0 {
-                mc.alloc((self.data.capacity() - old_capacity) as i64)
-            }
-        }
-        Ok(bytes_written)
-    }
-
-    fn flush(&mut self) -> IoResult<()> {
-        // No-op
-        self.data.flush()
-    }
-}
-
-impl AsRef<[u8]> for Buffer<u8> {
-    fn as_ref(&self) -> &[u8] {
-        self.data.as_slice()
-    }
-}
-
-impl<T: Clone> Drop for Buffer<T> {
-    #[inline]
-    fn drop(&mut self) {
-        if let Some(ref mc) = self.mem_tracker {
-            mc.alloc(-((self.data.capacity() * self.type_length) as i64));
-        }
-    }
-}
-
 // ----------------------------------------------------------------------
 // Immutable Buffer (BufferPtr) classes
 
 /// An representation of a slice on a reference-counting and read-only byte 
array.
 /// Sub-slices can be further created from this. The byte array will be 
released
 /// when all slices are dropped.
+///
+/// TODO: Remove and replace with [`bytes::Bytes`]
 #[allow(clippy::rc_buffer)]
 #[derive(Clone, Debug)]
-pub struct BufferPtr<T> {
-    data: Arc<Vec<T>>,
-    start: usize,
-    len: usize,
-    // TODO: will this create too many references? rethink about this.
-    mem_tracker: Option<MemTrackerPtr>,
+pub struct ByteBufferPtr {
+    data: Bytes,
 }
 
-impl<T> BufferPtr<T> {
+impl ByteBufferPtr {
     /// Creates new buffer from a vector.
-    pub fn new(v: Vec<T>) -> Self {
-        let len = v.len();
-        Self {
-            data: Arc::new(v),
-            start: 0,
-            len,
-            mem_tracker: None,
-        }
+    pub fn new(v: Vec<u8>) -> Self {
+        Self { data: v.into() }
     }
 
     /// Returns slice of data in this buffer.
     #[inline]
-    pub fn data(&self) -> &[T] {
-        &self.data[self.start..self.start + self.len]
-    }
-
-    /// Updates this buffer with new `start` position and length `len`.
-    ///
-    /// Range should be within current start position and length.
-    #[inline]
-    pub fn with_range(mut self, start: usize, len: usize) -> Self {
-        self.set_range(start, len);
-        self
-    }
-
-    /// Updates this buffer with new `start` position and length `len`.
-    ///
-    /// Range should be within current start position and length.
-    #[inline]
-    pub fn set_range(&mut self, start: usize, len: usize) {
-        assert!(self.start <= start && start + len <= self.start + self.len);
-        self.start = start;
-        self.len = len;
-    }
-
-    /// Adds memory tracker to this buffer.
-    pub fn with_mem_tracker(mut self, mc: MemTrackerPtr) -> Self {
-        self.mem_tracker = Some(mc);
-        self
-    }
-
-    /// Returns start position of this buffer.
-    #[inline]
-    pub fn start(&self) -> usize {
-        self.start
-    }
-
-    /// Returns the end position of this buffer
-    #[inline]
-    pub fn end(&self) -> usize {
-        self.start + self.len
+    pub fn data(&self) -> &[u8] {
+        &self.data
     }
 
     /// Returns length of this buffer
     #[inline]
     pub fn len(&self) -> usize {
-        self.len
+        self.data.len()
     }
 
     /// Returns whether this buffer is empty
     #[inline]
     pub fn is_empty(&self) -> bool {
-        self.len == 0
-    }
-
-    /// Returns `true` if this buffer has memory tracker, `false` otherwise.
-    pub fn is_mem_tracked(&self) -> bool {
-        self.mem_tracker.is_some()
+        self.data.is_empty()
     }
 
     /// Returns a shallow copy of the buffer.
     /// Reference counted pointer to the data is copied.
-    pub fn all(&self) -> BufferPtr<T> {
-        BufferPtr {
-            data: self.data.clone(),
-            start: self.start,
-            len: self.len,
-            mem_tracker: self.mem_tracker.as_ref().cloned(),
-        }
+    pub fn all(&self) -> Self {
+        self.clone()
     }
 
     /// Returns a shallow copy of the buffer that starts with `start` position.
-    pub fn start_from(&self, start: usize) -> BufferPtr<T> {
-        assert!(start <= self.len);
-        BufferPtr {
-            data: self.data.clone(),
-            start: self.start + start,
-            len: self.len - start,
-            mem_tracker: self.mem_tracker.as_ref().cloned(),
+    pub fn start_from(&self, start: usize) -> Self {
+        Self {
+            data: self.data.slice(start..),
         }
     }
 
     /// Returns a shallow copy that is a range slice within this buffer.
-    pub fn range(&self, start: usize, len: usize) -> BufferPtr<T> {
-        assert!(start + len <= self.len);
-        BufferPtr {
-            data: self.data.clone(),
-            start: self.start + start,
-            len,
-            mem_tracker: self.mem_tracker.as_ref().cloned(),
+    pub fn range(&self, start: usize, len: usize) -> Self {
+        Self {
+            data: self.data.slice(start..start + len),
         }
     }
 }
 
-impl<T: Sized> Index<usize> for BufferPtr<T> {
-    type Output = T;
+impl Index<usize> for ByteBufferPtr {
+    type Output = u8;
 
-    fn index(&self, index: usize) -> &T {
-        assert!(index < self.len);
-        &self.data[self.start + index]
+    fn index(&self, index: usize) -> &u8 {
+        &self.data[index]
     }
 }
 
-impl<T: Debug> Display for BufferPtr<T> {
+impl Display for ByteBufferPtr {
     fn fmt(&self, f: &mut Formatter) -> FmtResult {
         write!(f, "{:?}", self.data)
     }
 }
 
-impl<T> Drop for BufferPtr<T> {
-    fn drop(&mut self) {
-        if let Some(ref mc) = self.mem_tracker {
-            if Arc::strong_count(&self.data) == 1 && 
Arc::weak_count(&self.data) == 0 {
-                mc.alloc(-(self.data.capacity() as i64));
-            }
-        }
+impl AsRef<[u8]> for ByteBufferPtr {
+    #[inline]
+    fn as_ref(&self) -> &[u8] {
+        &self.data
     }
 }
 
-impl AsRef<[u8]> for BufferPtr<u8> {
-    #[inline]
-    fn as_ref(&self) -> &[u8] {
-        &self.data[self.start..self.start + self.len]
+impl From<Vec<u8>> for ByteBufferPtr {
+    fn from(data: Vec<u8>) -> Self {
+        Self { data: data.into() }
     }
 }
 
@@ -421,128 +113,23 @@ impl AsRef<[u8]> for BufferPtr<u8> {
 mod tests {
     use super::*;
 
-    #[test]
-    fn test_byte_buffer_mem_tracker() {
-        let mem_tracker = Arc::new(MemTracker::new());
-
-        let mut buffer = 
ByteBuffer::new().with_mem_tracker(mem_tracker.clone());
-        buffer.set_data(vec![0; 10]);
-        assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
-        buffer.set_data(vec![0; 20]);
-        let capacity = buffer.capacity() as i64;
-        assert_eq!(mem_tracker.memory_usage(), capacity);
-
-        let max_capacity = {
-            let mut buffer2 = 
ByteBuffer::new().with_mem_tracker(mem_tracker.clone());
-            buffer2.reserve(30);
-            assert_eq!(
-                mem_tracker.memory_usage(),
-                buffer2.capacity() as i64 + capacity
-            );
-            buffer2.set_data(vec![0; 100]);
-            assert_eq!(
-                mem_tracker.memory_usage(),
-                buffer2.capacity() as i64 + capacity
-            );
-            buffer2.capacity() as i64 + capacity
-        };
-
-        assert_eq!(mem_tracker.memory_usage(), capacity);
-        assert_eq!(mem_tracker.max_memory_usage(), max_capacity);
-
-        buffer.reserve(40);
-        assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
-
-        buffer.consume();
-        assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
-    }
-
-    #[test]
-    fn test_byte_ptr_mem_tracker() {
-        let mem_tracker = Arc::new(MemTracker::new());
-
-        let mut buffer = 
ByteBuffer::new().with_mem_tracker(mem_tracker.clone());
-        buffer.set_data(vec![0; 60]);
-
-        {
-            let buffer_capacity = buffer.capacity() as i64;
-            let buf_ptr = buffer.consume();
-            assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
-            {
-                let buf_ptr1 = buf_ptr.all();
-                {
-                    let _ = buf_ptr.start_from(20);
-                    assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
-                }
-                assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
-                let _ = buf_ptr1.range(30, 20);
-                assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
-            }
-            assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
-        }
-        assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
-    }
-
-    #[test]
-    fn test_byte_buffer() {
-        let mut buffer = ByteBuffer::new();
-        assert_eq!(buffer.size(), 0);
-        assert_eq!(buffer.capacity(), 0);
-
-        let mut buffer2 = ByteBuffer::new();
-        buffer2.reserve(40);
-        assert_eq!(buffer2.size(), 0);
-        assert_eq!(buffer2.capacity(), 40);
-
-        buffer.set_data((0..5).collect());
-        assert_eq!(buffer.size(), 5);
-        assert_eq!(buffer[4], 4);
-
-        buffer.set_data((0..20).collect());
-        assert_eq!(buffer.size(), 20);
-        assert_eq!(buffer[10], 10);
-
-        let expected: Vec<u8> = (0..20).collect();
-        {
-            let data = buffer.data();
-            assert_eq!(data, expected.as_slice());
-        }
-
-        buffer.reserve(40);
-        assert!(buffer.capacity() >= 40);
-
-        let byte_ptr = buffer.consume();
-        assert_eq!(buffer.size(), 0);
-        assert_eq!(byte_ptr.as_ref(), expected.as_slice());
-
-        let values: Vec<u8> = (0..30).collect();
-        let _ = buffer.write(values.as_slice());
-        let _ = buffer.flush();
-
-        assert_eq!(buffer.data(), values.as_slice());
-    }
-
     #[test]
     fn test_byte_ptr() {
         let values = (0..50).collect();
         let ptr = ByteBufferPtr::new(values);
         assert_eq!(ptr.len(), 50);
-        assert_eq!(ptr.start(), 0);
         assert_eq!(ptr[40], 40);
 
         let ptr2 = ptr.all();
         assert_eq!(ptr2.len(), 50);
-        assert_eq!(ptr2.start(), 0);
         assert_eq!(ptr2[40], 40);
 
         let ptr3 = ptr.start_from(20);
         assert_eq!(ptr3.len(), 30);
-        assert_eq!(ptr3.start(), 20);
         assert_eq!(ptr3[0], 20);
 
         let ptr4 = ptr3.range(10, 10);
         assert_eq!(ptr4.len(), 10);
-        assert_eq!(ptr4.start(), 30);
         assert_eq!(ptr4[0], 30);
 
         let expected: Vec<u8> = (30..40).collect();
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index 1c0fd6283..ffa559f3f 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -25,13 +25,10 @@ use crate::encodings::levels::LevelEncoder;
 use crate::errors::Result;
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
 use crate::util::memory::ByteBufferPtr;
-use crate::util::memory::MemTracker;
-use crate::util::memory::MemTrackerPtr;
 use crate::util::test_common::random_numbers_range;
 use rand::distributions::uniform::SampleUniform;
 use std::collections::VecDeque;
 use std::mem;
-use std::sync::Arc;
 
 pub trait DataPageBuilder {
     fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
@@ -50,7 +47,6 @@ pub trait DataPageBuilder {
 pub struct DataPageBuilderImpl {
     desc: ColumnDescPtr,
     encoding: Option<Encoding>,
-    mem_tracker: MemTrackerPtr,
     num_values: u32,
     buffer: Vec<u8>,
     rep_levels_byte_len: u32,
@@ -66,7 +62,6 @@ impl DataPageBuilderImpl {
         DataPageBuilderImpl {
             desc,
             encoding: None,
-            mem_tracker: Arc::new(MemTracker::new()),
             num_values,
             buffer: vec![],
             rep_levels_byte_len: 0,
@@ -122,7 +117,7 @@ impl DataPageBuilder for DataPageBuilderImpl {
         );
         self.encoding = Some(encoding);
         let mut encoder: Box<dyn Encoder<T>> =
-            get_encoder::<T>(self.desc.clone(), encoding, 
self.mem_tracker.clone())
+            get_encoder::<T>(self.desc.clone(), encoding)
                 .expect("get_encoder() should be OK");
         encoder.put(values).expect("put() should be OK");
         let encoded_values = encoder
@@ -252,8 +247,7 @@ pub fn make_pages<T: DataType>(
     let max_def_level = desc.max_def_level();
     let max_rep_level = desc.max_rep_level();
 
-    let mem_tracker = Arc::new(MemTracker::new());
-    let mut dict_encoder = DictEncoder::<T>::new(desc.clone(), mem_tracker);
+    let mut dict_encoder = DictEncoder::<T>::new(desc.clone());
 
     for i in 0..num_pages {
         let mut num_values_cur_page = 0;

Reply via email to