This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 74e2c5cd23 Make ArrowRowGroupWriter Public and
SerializedRowGroupWriter Send (#4850)
74e2c5cd23 is described below
commit 74e2c5cd23070d6803ce1e0dbfb78693d463d1c2
Author: Devin D'Angelo <[email protected]>
AuthorDate: Mon Sep 25 07:31:00 2023 -0400
Make ArrowRowGroupWriter Public and SerializedRowGroupWriter Send (#4850)
* changes in supported of async parallel parquet writer
* rename ChainReader
* cargo fmt
---
parquet/src/arrow/arrow_writer/mod.rs | 20 +++++++++++---------
parquet/src/file/writer.rs | 3 ++-
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 5417ebe894..2e170738f1 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -248,7 +248,7 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
/// A list of [`Bytes`] comprising a single column chunk
#[derive(Default)]
-struct ArrowColumnChunk {
+pub struct ArrowColumnChunk {
length: usize,
data: Vec<Bytes>,
}
@@ -260,11 +260,13 @@ impl Length for ArrowColumnChunk {
}
impl ChunkReader for ArrowColumnChunk {
- type T = ChainReader;
+ type T = ArrowColumnChunkReader;
fn get_read(&self, start: u64) -> Result<Self::T> {
assert_eq!(start, 0); // Assume append_column writes all data in
one-shot
- Ok(ChainReader(self.data.clone().into_iter().peekable()))
+ Ok(ArrowColumnChunkReader(
+ self.data.clone().into_iter().peekable(),
+ ))
}
fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
@@ -273,9 +275,9 @@ impl ChunkReader for ArrowColumnChunk {
}
/// A [`Read`] for an iterator of [`Bytes`]
-struct ChainReader(Peekable<IntoIter<Bytes>>);
+pub struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
-impl Read for ChainReader {
+impl Read for ArrowColumnChunkReader {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
let buffer = loop {
match self.0.peek_mut() {
@@ -362,14 +364,14 @@ impl ArrowColumnWriter {
}
/// Encodes [`RecordBatch`] to a parquet row group
-struct ArrowRowGroupWriter {
+pub struct ArrowRowGroupWriter {
writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
schema: SchemaRef,
buffered_rows: usize,
}
impl ArrowRowGroupWriter {
- fn new(
+ pub fn new(
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
@@ -386,7 +388,7 @@ impl ArrowRowGroupWriter {
})
}
- fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut().map(|(_, x)| x);
for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
@@ -396,7 +398,7 @@ impl ArrowRowGroupWriter {
Ok(())
}
- fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
+ pub fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
self.writers
.into_iter()
.map(|(chunk, writer)| {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index cafb176135..859a0aa1f9 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -115,7 +115,8 @@ pub type OnCloseRowGroup<'a> = Box<
Vec<Option<ColumnIndex>>,
Vec<Option<OffsetIndex>>,
) -> Result<()>
- + 'a,
+ + 'a
+ + Send,
>;
// ----------------------------------------------------------------------