This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new b9a41f3ff Use bytes in parquet (#1474) (#1683)
b9a41f3ff is described below
commit b9a41f3ff64c624ecc8b0c6045095becba02f370
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 11 17:54:07 2022 +0100
Use bytes in parquet (#1474) (#1683)
---
parquet/Cargo.toml | 1 +
parquet/src/arrow/array_reader.rs | 4 +-
parquet/src/arrow/array_reader/test_util.rs | 9 +-
parquet/src/column/reader.rs | 16 +-
parquet/src/column/writer.rs | 8 +-
parquet/src/data_type.rs | 13 +-
parquet/src/encodings/decoding.rs | 11 +-
parquet/src/encodings/encoding.rs | 83 ++---
parquet/src/encodings/levels.rs | 2 +-
parquet/src/util/memory.rs | 479 ++--------------------------
parquet/src/util/test_common/page_util.rs | 10 +-
11 files changed, 91 insertions(+), 545 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 265f95197..94ed8bbc3 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -31,6 +31,7 @@ rust-version = "1.57"
[dependencies]
parquet-format = "4.0.0"
+bytes = "1.1"
byteorder = "1"
thrift = "0.13"
snap = { version = "1.0", optional = true }
diff --git a/parquet/src/arrow/array_reader.rs
b/parquet/src/arrow/array_reader.rs
index 1d8441cbd..d2250f8ef 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -1381,7 +1381,6 @@ mod tests {
#[test]
fn test_complex_array_reader_dict_enc_string() {
use crate::encodings::encoding::{DictEncoder, Encoder};
- use crate::util::memory::MemTracker;
// Construct column schema
let message_type = "
message test_schema {
@@ -1412,9 +1411,8 @@ mod tests {
let mut all_values = Vec::with_capacity(num_pages * values_per_page);
for i in 0..num_pages {
- let mem_tracker = Arc::new(MemTracker::new());
let mut dict_encoder =
- DictEncoder::<ByteArrayType>::new(column_desc.clone(),
mem_tracker);
+ DictEncoder::<ByteArrayType>::new(column_desc.clone());
// add data page
let mut values = Vec::with_capacity(values_per_page);
diff --git a/parquet/src/arrow/array_reader/test_util.rs
b/parquet/src/arrow/array_reader/test_util.rs
index afee4659c..0c044eb2d 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -29,7 +29,7 @@ use crate::errors::Result;
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type,
};
-use crate::util::memory::{ByteBufferPtr, MemTracker};
+use crate::util::memory::ByteBufferPtr;
/// Returns a descriptor for a UTF-8 column
pub fn utf8_column() -> ColumnDescPtr {
@@ -49,9 +49,7 @@ pub fn utf8_column() -> ColumnDescPtr {
/// Encode `data` with the provided `encoding`
pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) ->
ByteBufferPtr {
let descriptor = utf8_column();
- let mem_tracker = Arc::new(MemTracker::new());
- let mut encoder =
- get_encoder::<ByteArrayType>(descriptor, encoding,
mem_tracker).unwrap();
+ let mut encoder = get_encoder::<ByteArrayType>(descriptor,
encoding).unwrap();
encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
@@ -59,8 +57,7 @@ pub fn encode_byte_array(encoding: Encoding, data:
&[ByteArray]) -> ByteBufferPt
/// Returns the encoded dictionary and value data
pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr)
{
- let mut dict_encoder =
- DictEncoder::<ByteArrayType>::new(utf8_column(),
Arc::new(MemTracker::new()));
+ let mut dict_encoder = DictEncoder::<ByteArrayType>::new(utf8_column());
dict_encoder.put(data).unwrap();
let encoded_rle = dict_encoder.flush_buffer().unwrap();
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 1fc722f29..3a45ecf3f 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -338,13 +338,13 @@ where
let mut offset = 0;
if max_rep_level > 0 {
- let level_data = parse_v1_level(
+ let (bytes_read, level_data) = parse_v1_level(
max_rep_level,
num_values,
rep_level_encoding,
buf.start_from(offset),
)?;
- offset = level_data.end();
+ offset += bytes_read;
let decoder =
R::new(max_rep_level, rep_level_encoding,
level_data);
@@ -353,13 +353,13 @@ where
}
if max_def_level > 0 {
- let level_data = parse_v1_level(
+ let (bytes_read, level_data) = parse_v1_level(
max_def_level,
num_values,
def_level_encoding,
buf.start_from(offset),
)?;
- offset = level_data.end();
+ offset += bytes_read;
let decoder =
D::new(max_def_level, def_level_encoding,
level_data);
@@ -460,20 +460,20 @@ fn parse_v1_level(
num_buffered_values: u32,
encoding: Encoding,
buf: ByteBufferPtr,
-) -> Result<ByteBufferPtr> {
+) -> Result<(usize, ByteBufferPtr)> {
match encoding {
Encoding::RLE => {
let i32_size = std::mem::size_of::<i32>();
let data_size = read_num_bytes!(i32, i32_size, buf.as_ref()) as
usize;
- Ok(buf.range(i32_size, data_size))
+ Ok((i32_size + data_size, buf.range(i32_size, data_size)))
}
Encoding::BIT_PACKED => {
let bit_width = crate::util::bit_util::log2(max_level as u64 + 1)
as u8;
let num_bytes = ceil(
(num_buffered_values as usize * bit_width as usize) as i64,
8,
- );
- Ok(buf.range(0, num_bytes as usize))
+ ) as usize;
+ Ok((num_bytes, buf.range(0, num_bytes)))
}
_ => Err(general_err!("invalid level encoding: {}", encoding)),
}
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 13ea85157..a7d0ba8fc 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -16,7 +16,7 @@
// under the License.
//! Contains column writer API.
-use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData,
sync::Arc};
+use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};
use crate::basic::{Compression, ConvertedType, Encoding, LogicalType,
PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
@@ -36,7 +36,7 @@ use crate::file::{
};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::FromBytes;
-use crate::util::memory::{ByteBufferPtr, MemTracker};
+use crate::util::memory::ByteBufferPtr;
/// Column writer for a Parquet type.
pub enum ColumnWriter {
@@ -213,7 +213,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
let dict_encoder = if props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), &props)
{
- Some(DictEncoder::new(descr.clone(), Arc::new(MemTracker::new())))
+ Some(DictEncoder::new(descr.clone()))
} else {
None
};
@@ -227,7 +227,6 @@ impl<T: DataType> ColumnWriterImpl<T> {
props
.encoding(descr.path())
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(),
&props)),
- Arc::new(MemTracker::new()),
)
.unwrap();
@@ -1135,6 +1134,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b:
&[u8]) -> bool {
#[cfg(test)]
mod tests {
use rand::distributions::uniform::SampleUniform;
+ use std::sync::Arc;
use crate::column::{
page::PageReader,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index ae1e01365..28645a262 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -31,7 +31,7 @@ use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::util::{
bit_util::{from_ne_slice, FromBytes},
- memory::{ByteBuffer, ByteBufferPtr},
+ memory::ByteBufferPtr,
};
/// Rust representation for logical type INT96, value is backed by an array of
`u32`.
@@ -217,14 +217,6 @@ impl From<ByteBufferPtr> for ByteArray {
}
}
-impl From<ByteBuffer> for ByteArray {
- fn from(mut buf: ByteBuffer) -> ByteArray {
- Self {
- data: Some(buf.consume()),
- }
- }
-}
-
impl PartialEq for ByteArray {
fn eq(&self, other: &ByteArray) -> bool {
match (&self.data, &other.data) {
@@ -1322,8 +1314,7 @@ mod tests {
ByteArray::from(ByteBufferPtr::new(vec![1u8, 2u8, 3u8, 4u8,
5u8])).data(),
&[1u8, 2u8, 3u8, 4u8, 5u8]
);
- let mut buf = ByteBuffer::new();
- buf.set_data(vec![6u8, 7u8, 8u8, 9u8, 10u8]);
+ let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
}
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index 24e0c962e..7c95d5532 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -936,11 +936,7 @@ mod tests {
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
};
- use crate::util::{
- bit_util::set_array_bit,
- memory::{BufferPtr, MemTracker},
- test_common::RandGen,
- };
+ use crate::util::{bit_util::set_array_bit, test_common::RandGen};
#[test]
fn test_get_decoders() {
@@ -1389,7 +1385,7 @@ mod tests {
let length = data.len();
- let ptr = BufferPtr::new(data);
+ let ptr = ByteBufferPtr::new(data);
let mut reader = BitReader::new(ptr.clone());
assert_eq!(reader.get_vlq_int().unwrap(), 256);
assert_eq!(reader.get_vlq_int().unwrap(), 4);
@@ -1472,8 +1468,7 @@ mod tests {
// Encode data
let mut encoder =
- get_encoder::<T>(col_descr.clone(), encoding,
Arc::new(MemTracker::new()))
- .expect("get encoder");
+ get_encoder::<T>(col_descr.clone(), encoding).expect("get
encoder");
for v in &data[..] {
encoder.put(&v[..]).expect("ok to encode");
diff --git a/parquet/src/encodings/encoding.rs
b/parquet/src/encodings/encoding.rs
index f4f304625..4b9cf2e9b 100644
--- a/parquet/src/encodings/encoding.rs
+++ b/parquet/src/encodings/encoding.rs
@@ -28,7 +28,7 @@ use crate::schema::types::ColumnDescPtr;
use crate::util::{
bit_util::{self, log2, num_required_bits, BitWriter},
hash_util,
- memory::{Buffer, ByteBuffer, ByteBufferPtr, MemTrackerPtr},
+ memory::ByteBufferPtr,
};
// ----------------------------------------------------------------------
@@ -76,10 +76,9 @@ pub trait Encoder<T: DataType> {
pub fn get_encoder<T: DataType>(
desc: ColumnDescPtr,
encoding: Encoding,
- mem_tracker: MemTrackerPtr,
) -> Result<Box<dyn Encoder<T>>> {
let encoder: Box<dyn Encoder<T>> = match encoding {
- Encoding::PLAIN => Box::new(PlainEncoder::new(desc, mem_tracker,
vec![])),
+ Encoding::PLAIN => Box::new(PlainEncoder::new(desc, vec![])),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
return Err(general_err!(
"Cannot initialize this encoding through this function"
@@ -109,7 +108,7 @@ pub fn get_encoder<T: DataType>(
/// - BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
/// - FIXED_LEN_BYTE_ARRAY - just the bytes are stored.
pub struct PlainEncoder<T: DataType> {
- buffer: ByteBuffer,
+ buffer: Vec<u8>,
bit_writer: BitWriter,
desc: ColumnDescPtr,
_phantom: PhantomData<T>,
@@ -117,11 +116,9 @@ pub struct PlainEncoder<T: DataType> {
impl<T: DataType> PlainEncoder<T> {
/// Creates new plain encoder.
- pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr, vec: Vec<u8>)
-> Self {
- let mut byte_buffer = ByteBuffer::new().with_mem_tracker(mem_tracker);
- byte_buffer.set_data(vec);
+ pub fn new(desc: ColumnDescPtr, buffer: Vec<u8>) -> Self {
Self {
- buffer: byte_buffer,
+ buffer,
bit_writer: BitWriter::new(256),
desc,
_phantom: PhantomData,
@@ -139,16 +136,15 @@ impl<T: DataType> Encoder<T> for PlainEncoder<T> {
}
fn estimated_data_encoded_size(&self) -> usize {
- self.buffer.size() + self.bit_writer.bytes_written()
+ self.buffer.len() + self.bit_writer.bytes_written()
}
#[inline]
fn flush_buffer(&mut self) -> Result<ByteBufferPtr> {
- self.buffer.write_all(self.bit_writer.flush_buffer())?;
- self.buffer.flush()?;
+ self.buffer
+ .extend_from_slice(self.bit_writer.flush_buffer());
self.bit_writer.clear();
-
- Ok(self.buffer.consume())
+ Ok(std::mem::take(&mut self.buffer).into())
}
#[inline]
@@ -189,35 +185,31 @@ pub struct DictEncoder<T: DataType> {
// Stores indices which map (many-to-one) to the values in the `uniques`
array.
// Here we are using fix-sized array with linear probing.
// A slot with `HASH_SLOT_EMPTY` indicates the slot is not currently
occupied.
- hash_slots: Buffer<i32>,
+ hash_slots: Vec<i32>,
// Indices that have not yet be written out by `write_indices()`.
- buffered_indices: Buffer<i32>,
+ buffered_indices: Vec<i32>,
// The unique observed values.
- uniques: Buffer<T::T>,
+ uniques: Vec<T::T>,
// Size in bytes needed to encode this dictionary.
uniques_size_in_bytes: usize,
-
- // Tracking memory usage for the various data structures in this struct.
- mem_tracker: MemTrackerPtr,
}
impl<T: DataType> DictEncoder<T> {
/// Creates new dictionary encoder.
- pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr) -> Self {
- let mut slots = Buffer::new().with_mem_tracker(mem_tracker.clone());
+ pub fn new(desc: ColumnDescPtr) -> Self {
+ let mut slots = vec![];
slots.resize(INITIAL_HASH_TABLE_SIZE, -1);
Self {
desc,
hash_table_size: INITIAL_HASH_TABLE_SIZE,
mod_bitmask: (INITIAL_HASH_TABLE_SIZE - 1) as u32,
hash_slots: slots,
- buffered_indices:
Buffer::new().with_mem_tracker(mem_tracker.clone()),
- uniques: Buffer::new().with_mem_tracker(mem_tracker.clone()),
+ buffered_indices: vec![],
+ uniques: vec![],
uniques_size_in_bytes: 0,
- mem_tracker,
}
}
@@ -230,7 +222,7 @@ impl<T: DataType> DictEncoder<T> {
/// Returns number of unique values (keys) in the dictionary.
pub fn num_entries(&self) -> usize {
- self.uniques.size()
+ self.uniques.len()
}
/// Returns size of unique values (keys) in the dictionary, in bytes.
@@ -242,9 +234,8 @@ impl<T: DataType> DictEncoder<T> {
/// the result.
#[inline]
pub fn write_dict(&self) -> Result<ByteBufferPtr> {
- let mut plain_encoder =
- PlainEncoder::<T>::new(self.desc.clone(),
self.mem_tracker.clone(), vec![]);
- plain_encoder.put(self.uniques.data())?;
+ let mut plain_encoder = PlainEncoder::<T>::new(self.desc.clone(),
vec![]);
+ plain_encoder.put(&self.uniques)?;
plain_encoder.flush_buffer()
}
@@ -255,12 +246,11 @@ impl<T: DataType> DictEncoder<T> {
let buffer_len = self.estimated_data_encoded_size();
let mut buffer: Vec<u8> = vec![0; buffer_len as usize];
buffer[0] = self.bit_width() as u8;
- self.mem_tracker.alloc(buffer.capacity() as i64);
// Write bit width in the first byte
buffer.write_all((self.bit_width() as u8).as_bytes())?;
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer,
1);
- for index in self.buffered_indices.data() {
+ for index in &self.buffered_indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
}
@@ -293,7 +283,7 @@ impl<T: DataType> DictEncoder<T> {
#[inline(never)]
fn insert_fresh_slot(&mut self, slot: usize, value: T::T) -> i32 {
- let index = self.uniques.size() as i32;
+ let index = self.uniques.len() as i32;
self.hash_slots[slot] = index;
let (base_size, num_elements) = value.dict_encoding_size();
@@ -307,7 +297,7 @@ impl<T: DataType> DictEncoder<T> {
self.uniques_size_in_bytes += unique_size;
self.uniques.push(value);
- if self.uniques.size() > (self.hash_table_size as f32 * MAX_HASH_LOAD)
as usize {
+ if self.uniques.len() > (self.hash_table_size as f32 * MAX_HASH_LOAD)
as usize {
self.double_table_size();
}
@@ -316,7 +306,7 @@ impl<T: DataType> DictEncoder<T> {
#[inline]
fn bit_width(&self) -> u8 {
- let num_entries = self.uniques.size();
+ let num_entries = self.uniques.len();
if num_entries == 0 {
0
} else if num_entries == 1 {
@@ -328,7 +318,7 @@ impl<T: DataType> DictEncoder<T> {
fn double_table_size(&mut self) {
let new_size = self.hash_table_size * 2;
- let mut new_hash_slots =
Buffer::new().with_mem_tracker(self.mem_tracker.clone());
+ let mut new_hash_slots = vec![];
new_hash_slots.resize(new_size, HASH_SLOT_EMPTY);
for i in 0..self.hash_table_size {
let index = self.hash_slots[i];
@@ -376,7 +366,7 @@ impl<T: DataType> Encoder<T> for DictEncoder<T> {
fn estimated_data_encoded_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::min_buffer_size(bit_width)
- + RleEncoder::max_buffer_size(bit_width,
self.buffered_indices.size())
+ + RleEncoder::max_buffer_size(bit_width,
self.buffered_indices.len())
}
#[inline]
@@ -677,10 +667,9 @@ impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
// Write page header with total values
self.write_page_header();
- let mut buffer = ByteBuffer::new();
- buffer.write_all(self.page_header_writer.flush_buffer())?;
- buffer.write_all(self.bit_writer.flush_buffer())?;
- buffer.flush()?;
+ let mut buffer = Vec::new();
+ buffer.extend_from_slice(self.page_header_writer.flush_buffer());
+ buffer.extend_from_slice(self.bit_writer.flush_buffer());
// Reset state
self.page_header_writer.clear();
@@ -690,7 +679,7 @@ impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
self.current_value = 0;
self.values_in_block = 0;
- Ok(buffer.consume())
+ Ok(buffer.into())
}
}
@@ -933,10 +922,7 @@ mod tests {
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
};
- use crate::util::{
- memory::MemTracker,
- test_common::{random_bytes, RandGen},
- };
+ use crate::util::test_common::{random_bytes, RandGen};
const TEST_SET_SIZE: usize = 1024;
@@ -1286,8 +1272,7 @@ mod tests {
err: Option<ParquetError>,
) {
let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
- let mem_tracker = Arc::new(MemTracker::new());
- let encoder = get_encoder::<T>(descr, encoding, mem_tracker);
+ let encoder = get_encoder::<T>(descr, encoding);
match err {
Some(parquet_error) => {
assert!(encoder.is_err());
@@ -1319,8 +1304,7 @@ mod tests {
enc: Encoding,
) -> Box<dyn Encoder<T>> {
let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
- let mem_tracker = Arc::new(MemTracker::new());
- get_encoder(desc, enc, mem_tracker).unwrap()
+ get_encoder(desc, enc).unwrap()
}
fn create_test_decoder<T: DataType>(
@@ -1333,8 +1317,7 @@ mod tests {
fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> {
let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
- let mem_tracker = Arc::new(MemTracker::new());
- DictEncoder::<T>::new(desc, mem_tracker)
+ DictEncoder::<T>::new(desc)
}
fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> {
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index deabbd440..c8682e06d 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -207,7 +207,7 @@ impl LevelDecoder {
let num_bytes =
ceil((num_buffered_values * bit_width as usize) as i64, 8);
let data_size = cmp::min(num_bytes as usize, data.len());
- decoder.reset(data.range(data.start(), data_size));
+ decoder.reset(data.range(0, data_size));
data_size
}
_ => panic!(),
diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs
index 923c45db1..0b0c707ff 100644
--- a/parquet/src/util/memory.rs
+++ b/parquet/src/util/memory.rs
@@ -17,403 +17,95 @@
//! Utility methods and structs for working with memory.
+use bytes::Bytes;
use std::{
fmt::{Debug, Display, Formatter, Result as FmtResult},
- io::{Result as IoResult, Write},
- mem,
- ops::{Index, IndexMut},
- sync::{
- atomic::{AtomicI64, Ordering},
- Arc, Weak,
- },
+ ops::Index,
};
-// ----------------------------------------------------------------------
-// Memory Tracker classes
-
-/// Reference counted pointer for [`MemTracker`].
-pub type MemTrackerPtr = Arc<MemTracker>;
-/// Non-owning reference for [`MemTracker`].
-pub type WeakMemTrackerPtr = Weak<MemTracker>;
-
-/// Struct to track memory usage information.
-#[derive(Debug)]
-pub struct MemTracker {
- // In the tuple, the first element is the current memory allocated (in
bytes),
- // and the second element is the maximum memory allocated so far (in
bytes).
- current_memory_usage: AtomicI64,
- max_memory_usage: AtomicI64,
-}
-
-impl MemTracker {
- /// Creates new memory tracker.
- #[inline]
- pub fn new() -> MemTracker {
- MemTracker {
- current_memory_usage: Default::default(),
- max_memory_usage: Default::default(),
- }
- }
-
- /// Returns the current memory consumption, in bytes.
- pub fn memory_usage(&self) -> i64 {
- self.current_memory_usage.load(Ordering::Acquire)
- }
-
- /// Returns the maximum memory consumption so far, in bytes.
- pub fn max_memory_usage(&self) -> i64 {
- self.max_memory_usage.load(Ordering::Acquire)
- }
-
- /// Adds `num_bytes` to the memory consumption tracked by this memory
tracker.
- #[inline]
- pub fn alloc(&self, num_bytes: i64) {
- let new_current = self
- .current_memory_usage
- .fetch_add(num_bytes, Ordering::Acquire)
- + num_bytes;
- self.max_memory_usage
- .fetch_max(new_current, Ordering::Acquire);
- }
-}
-
-// ----------------------------------------------------------------------
-// Buffer classes
-
-/// Type alias for [`Buffer`].
-pub type ByteBuffer = Buffer<u8>;
-/// Type alias for [`BufferPtr`].
-pub type ByteBufferPtr = BufferPtr<u8>;
-
-/// A resize-able buffer class with generic member, with optional memory
tracker.
-///
-/// Note that a buffer has two attributes:
-/// `capacity` and `size`: the former is the total number of space reserved for
-/// the buffer, while the latter is the actual number of elements.
-/// Invariant: `capacity` >= `size`.
-/// The total allocated bytes for a buffer equals to `capacity * sizeof<T>()`.
-pub struct Buffer<T: Clone> {
- data: Vec<T>,
- mem_tracker: Option<MemTrackerPtr>,
- type_length: usize,
-}
-
-impl<T: Clone> Buffer<T> {
- /// Creates new empty buffer.
- pub fn new() -> Self {
- Buffer {
- data: vec![],
- mem_tracker: None,
- type_length: std::mem::size_of::<T>(),
- }
- }
-
- /// Adds [`MemTracker`] for this buffer.
- #[inline]
- pub fn with_mem_tracker(mut self, mc: MemTrackerPtr) -> Self {
- mc.alloc((self.data.capacity() * self.type_length) as i64);
- self.mem_tracker = Some(mc);
- self
- }
-
- /// Returns slice of data in this buffer.
- #[inline]
- pub fn data(&self) -> &[T] {
- self.data.as_slice()
- }
-
- /// Sets data for this buffer.
- #[inline]
- pub fn set_data(&mut self, new_data: Vec<T>) {
- if let Some(ref mc) = self.mem_tracker {
- let capacity_diff = new_data.capacity() as i64 -
self.data.capacity() as i64;
- mc.alloc(capacity_diff * self.type_length as i64);
- }
- self.data = new_data;
- }
-
- /// Resizes underlying data in place to a new length `new_size`.
- ///
- /// If `new_size` is less than current length, data is truncated,
otherwise, it is
- /// extended to `new_size` with provided default value `init_value`.
- ///
- /// Memory tracker is also updated, if available.
- #[inline]
- pub fn resize(&mut self, new_size: usize, init_value: T) {
- let old_capacity = self.data.capacity();
- self.data.resize(new_size, init_value);
- if let Some(ref mc) = self.mem_tracker {
- let capacity_diff = self.data.capacity() as i64 - old_capacity as
i64;
- mc.alloc(capacity_diff * self.type_length as i64);
- }
- }
-
- /// Clears underlying data.
- #[inline]
- pub fn clear(&mut self) {
- self.data.clear()
- }
-
- /// Reserves capacity `additional_capacity` for underlying data vector.
- ///
- /// Memory tracker is also updated, if available.
- #[inline]
- pub fn reserve(&mut self, additional_capacity: usize) {
- let old_capacity = self.data.capacity();
- self.data.reserve(additional_capacity);
- if self.data.capacity() > old_capacity {
- if let Some(ref mc) = self.mem_tracker {
- let capacity_diff = self.data.capacity() as i64 - old_capacity
as i64;
- mc.alloc(capacity_diff * self.type_length as i64);
- }
- }
- }
-
- /// Returns [`BufferPtr`] with buffer data.
- /// Buffer data is reset.
- #[inline]
- pub fn consume(&mut self) -> BufferPtr<T> {
- let old_data = mem::take(&mut self.data);
- let mut result = BufferPtr::new(old_data);
- if let Some(ref mc) = self.mem_tracker {
- result = result.with_mem_tracker(mc.clone());
- }
- result
- }
-
- /// Adds `value` to the buffer.
- #[inline]
- pub fn push(&mut self, value: T) {
- self.data.push(value)
- }
-
- /// Returns current capacity for the buffer.
- #[inline]
- pub fn capacity(&self) -> usize {
- self.data.capacity()
- }
-
- /// Returns current size for the buffer.
- #[inline]
- pub fn size(&self) -> usize {
- self.data.len()
- }
-
- /// Returns `true` if memory tracker is added to buffer, `false` otherwise.
- #[inline]
- pub fn is_mem_tracked(&self) -> bool {
- self.mem_tracker.is_some()
- }
-
- /// Returns memory tracker associated with this buffer.
- /// This may panic, if memory tracker is not set, use method above to
check if
- /// memory tracker is available.
- #[inline]
- pub fn mem_tracker(&self) -> &MemTrackerPtr {
- self.mem_tracker.as_ref().unwrap()
- }
-}
-
-impl<T: Sized + Clone> Index<usize> for Buffer<T> {
- type Output = T;
-
- fn index(&self, index: usize) -> &T {
- &self.data[index]
- }
-}
-
-impl<T: Sized + Clone> IndexMut<usize> for Buffer<T> {
- fn index_mut(&mut self, index: usize) -> &mut T {
- &mut self.data[index]
- }
-}
-
-// TODO: implement this for other types
-impl Write for Buffer<u8> {
- #[inline]
- fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
- let old_capacity = self.data.capacity();
- let bytes_written = self.data.write(buf)?;
- if let Some(ref mc) = self.mem_tracker {
- if self.data.capacity() - old_capacity > 0 {
- mc.alloc((self.data.capacity() - old_capacity) as i64)
- }
- }
- Ok(bytes_written)
- }
-
- fn flush(&mut self) -> IoResult<()> {
- // No-op
- self.data.flush()
- }
-}
-
-impl AsRef<[u8]> for Buffer<u8> {
- fn as_ref(&self) -> &[u8] {
- self.data.as_slice()
- }
-}
-
-impl<T: Clone> Drop for Buffer<T> {
- #[inline]
- fn drop(&mut self) {
- if let Some(ref mc) = self.mem_tracker {
- mc.alloc(-((self.data.capacity() * self.type_length) as i64));
- }
- }
-}
-
// ----------------------------------------------------------------------
// Immutable Buffer (BufferPtr) classes
/// An representation of a slice on a reference-counting and read-only byte
array.
/// Sub-slices can be further created from this. The byte array will be
released
/// when all slices are dropped.
+///
+/// TODO: Remove and replace with [`bytes::Bytes`]
#[allow(clippy::rc_buffer)]
#[derive(Clone, Debug)]
-pub struct BufferPtr<T> {
- data: Arc<Vec<T>>,
- start: usize,
- len: usize,
- // TODO: will this create too many references? rethink about this.
- mem_tracker: Option<MemTrackerPtr>,
+pub struct ByteBufferPtr {
+ data: Bytes,
}
-impl<T> BufferPtr<T> {
+impl ByteBufferPtr {
/// Creates new buffer from a vector.
- pub fn new(v: Vec<T>) -> Self {
- let len = v.len();
- Self {
- data: Arc::new(v),
- start: 0,
- len,
- mem_tracker: None,
- }
+ pub fn new(v: Vec<u8>) -> Self {
+ Self { data: v.into() }
}
/// Returns slice of data in this buffer.
#[inline]
- pub fn data(&self) -> &[T] {
- &self.data[self.start..self.start + self.len]
- }
-
- /// Updates this buffer with new `start` position and length `len`.
- ///
- /// Range should be within current start position and length.
- #[inline]
- pub fn with_range(mut self, start: usize, len: usize) -> Self {
- self.set_range(start, len);
- self
- }
-
- /// Updates this buffer with new `start` position and length `len`.
- ///
- /// Range should be within current start position and length.
- #[inline]
- pub fn set_range(&mut self, start: usize, len: usize) {
- assert!(self.start <= start && start + len <= self.start + self.len);
- self.start = start;
- self.len = len;
- }
-
- /// Adds memory tracker to this buffer.
- pub fn with_mem_tracker(mut self, mc: MemTrackerPtr) -> Self {
- self.mem_tracker = Some(mc);
- self
- }
-
- /// Returns start position of this buffer.
- #[inline]
- pub fn start(&self) -> usize {
- self.start
- }
-
- /// Returns the end position of this buffer
- #[inline]
- pub fn end(&self) -> usize {
- self.start + self.len
+ pub fn data(&self) -> &[u8] {
+ &self.data
}
/// Returns length of this buffer
#[inline]
pub fn len(&self) -> usize {
- self.len
+ self.data.len()
}
/// Returns whether this buffer is empty
#[inline]
pub fn is_empty(&self) -> bool {
- self.len == 0
- }
-
- /// Returns `true` if this buffer has memory tracker, `false` otherwise.
- pub fn is_mem_tracked(&self) -> bool {
- self.mem_tracker.is_some()
+ self.data.is_empty()
}
/// Returns a shallow copy of the buffer.
/// Reference counted pointer to the data is copied.
- pub fn all(&self) -> BufferPtr<T> {
- BufferPtr {
- data: self.data.clone(),
- start: self.start,
- len: self.len,
- mem_tracker: self.mem_tracker.as_ref().cloned(),
- }
+ pub fn all(&self) -> Self {
+ self.clone()
}
/// Returns a shallow copy of the buffer that starts with `start` position.
- pub fn start_from(&self, start: usize) -> BufferPtr<T> {
- assert!(start <= self.len);
- BufferPtr {
- data: self.data.clone(),
- start: self.start + start,
- len: self.len - start,
- mem_tracker: self.mem_tracker.as_ref().cloned(),
+ pub fn start_from(&self, start: usize) -> Self {
+ Self {
+ data: self.data.slice(start..),
}
}
/// Returns a shallow copy that is a range slice within this buffer.
- pub fn range(&self, start: usize, len: usize) -> BufferPtr<T> {
- assert!(start + len <= self.len);
- BufferPtr {
- data: self.data.clone(),
- start: self.start + start,
- len,
- mem_tracker: self.mem_tracker.as_ref().cloned(),
+ pub fn range(&self, start: usize, len: usize) -> Self {
+ Self {
+ data: self.data.slice(start..start + len),
}
}
}
-impl<T: Sized> Index<usize> for BufferPtr<T> {
- type Output = T;
+impl Index<usize> for ByteBufferPtr {
+ type Output = u8;
- fn index(&self, index: usize) -> &T {
- assert!(index < self.len);
- &self.data[self.start + index]
+ fn index(&self, index: usize) -> &u8 {
+ &self.data[index]
}
}
-impl<T: Debug> Display for BufferPtr<T> {
+impl Display for ByteBufferPtr {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "{:?}", self.data)
}
}
-impl<T> Drop for BufferPtr<T> {
- fn drop(&mut self) {
- if let Some(ref mc) = self.mem_tracker {
- if Arc::strong_count(&self.data) == 1 &&
Arc::weak_count(&self.data) == 0 {
- mc.alloc(-(self.data.capacity() as i64));
- }
- }
+impl AsRef<[u8]> for ByteBufferPtr {
+ #[inline]
+ fn as_ref(&self) -> &[u8] {
+ &self.data
}
}
-impl AsRef<[u8]> for BufferPtr<u8> {
- #[inline]
- fn as_ref(&self) -> &[u8] {
- &self.data[self.start..self.start + self.len]
+impl From<Vec<u8>> for ByteBufferPtr {
+ fn from(data: Vec<u8>) -> Self {
+ Self { data: data.into() }
}
}
@@ -421,128 +113,23 @@ impl AsRef<[u8]> for BufferPtr<u8> {
mod tests {
use super::*;
- #[test]
- fn test_byte_buffer_mem_tracker() {
- let mem_tracker = Arc::new(MemTracker::new());
-
- let mut buffer =
ByteBuffer::new().with_mem_tracker(mem_tracker.clone());
- buffer.set_data(vec![0; 10]);
- assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
- buffer.set_data(vec![0; 20]);
- let capacity = buffer.capacity() as i64;
- assert_eq!(mem_tracker.memory_usage(), capacity);
-
- let max_capacity = {
- let mut buffer2 =
ByteBuffer::new().with_mem_tracker(mem_tracker.clone());
- buffer2.reserve(30);
- assert_eq!(
- mem_tracker.memory_usage(),
- buffer2.capacity() as i64 + capacity
- );
- buffer2.set_data(vec![0; 100]);
- assert_eq!(
- mem_tracker.memory_usage(),
- buffer2.capacity() as i64 + capacity
- );
- buffer2.capacity() as i64 + capacity
- };
-
- assert_eq!(mem_tracker.memory_usage(), capacity);
- assert_eq!(mem_tracker.max_memory_usage(), max_capacity);
-
- buffer.reserve(40);
- assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
-
- buffer.consume();
- assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
- }
-
- #[test]
- fn test_byte_ptr_mem_tracker() {
- let mem_tracker = Arc::new(MemTracker::new());
-
- let mut buffer =
ByteBuffer::new().with_mem_tracker(mem_tracker.clone());
- buffer.set_data(vec![0; 60]);
-
- {
- let buffer_capacity = buffer.capacity() as i64;
- let buf_ptr = buffer.consume();
- assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
- {
- let buf_ptr1 = buf_ptr.all();
- {
- let _ = buf_ptr.start_from(20);
- assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
- }
- assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
- let _ = buf_ptr1.range(30, 20);
- assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
- }
- assert_eq!(mem_tracker.memory_usage(), buffer_capacity);
- }
- assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64);
- }
-
- #[test]
- fn test_byte_buffer() {
- let mut buffer = ByteBuffer::new();
- assert_eq!(buffer.size(), 0);
- assert_eq!(buffer.capacity(), 0);
-
- let mut buffer2 = ByteBuffer::new();
- buffer2.reserve(40);
- assert_eq!(buffer2.size(), 0);
- assert_eq!(buffer2.capacity(), 40);
-
- buffer.set_data((0..5).collect());
- assert_eq!(buffer.size(), 5);
- assert_eq!(buffer[4], 4);
-
- buffer.set_data((0..20).collect());
- assert_eq!(buffer.size(), 20);
- assert_eq!(buffer[10], 10);
-
- let expected: Vec<u8> = (0..20).collect();
- {
- let data = buffer.data();
- assert_eq!(data, expected.as_slice());
- }
-
- buffer.reserve(40);
- assert!(buffer.capacity() >= 40);
-
- let byte_ptr = buffer.consume();
- assert_eq!(buffer.size(), 0);
- assert_eq!(byte_ptr.as_ref(), expected.as_slice());
-
- let values: Vec<u8> = (0..30).collect();
- let _ = buffer.write(values.as_slice());
- let _ = buffer.flush();
-
- assert_eq!(buffer.data(), values.as_slice());
- }
-
#[test]
fn test_byte_ptr() {
let values = (0..50).collect();
let ptr = ByteBufferPtr::new(values);
assert_eq!(ptr.len(), 50);
- assert_eq!(ptr.start(), 0);
assert_eq!(ptr[40], 40);
let ptr2 = ptr.all();
assert_eq!(ptr2.len(), 50);
- assert_eq!(ptr2.start(), 0);
assert_eq!(ptr2[40], 40);
let ptr3 = ptr.start_from(20);
assert_eq!(ptr3.len(), 30);
- assert_eq!(ptr3.start(), 20);
assert_eq!(ptr3[0], 20);
let ptr4 = ptr3.range(10, 10);
assert_eq!(ptr4.len(), 10);
- assert_eq!(ptr4.start(), 30);
assert_eq!(ptr4[0], 30);
let expected: Vec<u8> = (30..40).collect();
diff --git a/parquet/src/util/test_common/page_util.rs
b/parquet/src/util/test_common/page_util.rs
index 1c0fd6283..ffa559f3f 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -25,13 +25,10 @@ use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
-use crate::util::memory::MemTracker;
-use crate::util::memory::MemTrackerPtr;
use crate::util::test_common::random_numbers_range;
use rand::distributions::uniform::SampleUniform;
use std::collections::VecDeque;
use std::mem;
-use std::sync::Arc;
pub trait DataPageBuilder {
fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
@@ -50,7 +47,6 @@ pub trait DataPageBuilder {
pub struct DataPageBuilderImpl {
desc: ColumnDescPtr,
encoding: Option<Encoding>,
- mem_tracker: MemTrackerPtr,
num_values: u32,
buffer: Vec<u8>,
rep_levels_byte_len: u32,
@@ -66,7 +62,6 @@ impl DataPageBuilderImpl {
DataPageBuilderImpl {
desc,
encoding: None,
- mem_tracker: Arc::new(MemTracker::new()),
num_values,
buffer: vec![],
rep_levels_byte_len: 0,
@@ -122,7 +117,7 @@ impl DataPageBuilder for DataPageBuilderImpl {
);
self.encoding = Some(encoding);
let mut encoder: Box<dyn Encoder<T>> =
- get_encoder::<T>(self.desc.clone(), encoding,
self.mem_tracker.clone())
+ get_encoder::<T>(self.desc.clone(), encoding)
.expect("get_encoder() should be OK");
encoder.put(values).expect("put() should be OK");
let encoded_values = encoder
@@ -252,8 +247,7 @@ pub fn make_pages<T: DataType>(
let max_def_level = desc.max_def_level();
let max_rep_level = desc.max_rep_level();
- let mem_tracker = Arc::new(MemTracker::new());
- let mut dict_encoder = DictEncoder::<T>::new(desc.clone(), mem_tracker);
+ let mut dict_encoder = DictEncoder::<T>::new(desc.clone());
for i in 0..num_pages {
let mut num_values_cur_page = 0;