This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 74e2c5cd23 Make ArrowRowGroupWriter Public and 
SerializedRowGroupWriter Send (#4850)
74e2c5cd23 is described below

commit 74e2c5cd23070d6803ce1e0dbfb78693d463d1c2
Author: Devin D'Angelo <[email protected]>
AuthorDate: Mon Sep 25 07:31:00 2023 -0400

    Make ArrowRowGroupWriter Public and SerializedRowGroupWriter Send (#4850)
    
    * changes in supported of async parallel parquet writer
    
    * rename ChainReader
    
    * cargo fmt
---
 parquet/src/arrow/arrow_writer/mod.rs | 20 +++++++++++---------
 parquet/src/file/writer.rs            |  3 ++-
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 5417ebe894..2e170738f1 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -248,7 +248,7 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
 
 /// A list of [`Bytes`] comprising a single column chunk
 #[derive(Default)]
-struct ArrowColumnChunk {
+pub struct ArrowColumnChunk {
     length: usize,
     data: Vec<Bytes>,
 }
@@ -260,11 +260,13 @@ impl Length for ArrowColumnChunk {
 }
 
 impl ChunkReader for ArrowColumnChunk {
-    type T = ChainReader;
+    type T = ArrowColumnChunkReader;
 
     fn get_read(&self, start: u64) -> Result<Self::T> {
         assert_eq!(start, 0); // Assume append_column writes all data in 
one-shot
-        Ok(ChainReader(self.data.clone().into_iter().peekable()))
+        Ok(ArrowColumnChunkReader(
+            self.data.clone().into_iter().peekable(),
+        ))
     }
 
     fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
@@ -273,9 +275,9 @@ impl ChunkReader for ArrowColumnChunk {
 }
 
 /// A [`Read`] for an iterator of [`Bytes`]
-struct ChainReader(Peekable<IntoIter<Bytes>>);
+pub struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
 
-impl Read for ChainReader {
+impl Read for ArrowColumnChunkReader {
     fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
         let buffer = loop {
             match self.0.peek_mut() {
@@ -362,14 +364,14 @@ impl ArrowColumnWriter {
 }
 
 /// Encodes [`RecordBatch`] to a parquet row group
-struct ArrowRowGroupWriter {
+pub struct ArrowRowGroupWriter {
     writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
     schema: SchemaRef,
     buffered_rows: usize,
 }
 
 impl ArrowRowGroupWriter {
-    fn new(
+    pub fn new(
         parquet: &SchemaDescriptor,
         props: &WriterPropertiesPtr,
         arrow: &SchemaRef,
@@ -386,7 +388,7 @@ impl ArrowRowGroupWriter {
         })
     }
 
-    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
         self.buffered_rows += batch.num_rows();
         let mut writers = self.writers.iter_mut().map(|(_, x)| x);
         for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
@@ -396,7 +398,7 @@ impl ArrowRowGroupWriter {
         Ok(())
     }
 
-    fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
+    pub fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
         self.writers
             .into_iter()
             .map(|(chunk, writer)| {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index cafb176135..859a0aa1f9 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -115,7 +115,8 @@ pub type OnCloseRowGroup<'a> = Box<
             Vec<Option<ColumnIndex>>,
             Vec<Option<OffsetIndex>>,
         ) -> Result<()>
-        + 'a,
+        + 'a
+        + Send,
 >;
 
 // ----------------------------------------------------------------------

Reply via email to