This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new aa799f0d0 Make GenericColumnWriter Send (#4287)
aa799f0d0 is described below

commit aa799f0d03c42b59d5accacf87b6bda4cd36ceae
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri May 26 23:04:53 2023 +0100

    Make GenericColumnWriter Send (#4287)
---
 parquet/src/arrow/arrow_writer/mod.rs |  6 +++---
 parquet/src/column/page.rs            |  2 +-
 parquet/src/column/writer/mod.rs      |  6 ++++++
 parquet/src/encodings/encoding/mod.rs |  2 +-
 parquet/src/file/writer.rs            | 10 +++++-----
 parquet/src/record/record_writer.rs   |  2 +-
 parquet_derive/src/lib.rs             |  2 +-
 parquet_derive_test/src/lib.rs        |  3 +--
 8 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 08cfc7ea3..616968bf6 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -117,7 +117,7 @@ impl<W: Write> Debug for ArrowWriter<W> {
     }
 }
 
-impl<W: Write> ArrowWriter<W> {
+impl<W: Write + Send> ArrowWriter<W> {
     /// Try to create a new Arrow writer
     ///
     /// The writer will fail if:
@@ -273,7 +273,7 @@ impl<W: Write> ArrowWriter<W> {
     }
 }
 
-impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
+impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
     fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
         self.write(batch).map_err(|e| e.into())
     }
@@ -284,7 +284,7 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
     }
 }
 
-fn write_leaves<W: Write>(
+fn write_leaves<W: Write + Send>(
     row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
     arrays: &[ArrayRef],
     levels: &mut [Vec<LevelInfo>],
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index bd3568d13..f854e5cac 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -248,7 +248,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
 ///
 /// It is reasonable to assume that all pages will be written in the correct 
order, e.g.
 /// dictionary page followed by data pages, or a set of data pages, etc.
-pub trait PageWriter {
+pub trait PageWriter: Send {
     /// Writes a page into the output stream/sink.
     /// Returns `PageWriteSpec` that contains information about written page 
metrics,
     /// including number of bytes, size, number of values, offset, etc.
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3fcfe6c19..bf77b2b32 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2174,6 +2174,12 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_send() {
+        fn test<T: Send>() {}
+        test::<ColumnWriterImpl<Int32Type>>();
+    }
+
     /// Performs write-read roundtrip with randomly generated values and 
levels.
     /// `max_size` is maximum number of values or levels (if `max_def_level` > 
0) to write
     /// for a column.
diff --git a/parquet/src/encodings/encoding/mod.rs 
b/parquet/src/encodings/encoding/mod.rs
index b7e30c4ec..3088f3321 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -40,7 +40,7 @@ mod dict_encoder;
 ///
 /// Currently this allocates internal buffers for the encoded values. After 
done putting
 /// values, caller should call `flush_buffer()` to get an immutable buffer 
pointer.
-pub trait Encoder<T: DataType> {
+pub trait Encoder<T: DataType>: Send {
     /// Encodes data from `values`.
     fn put(&mut self, values: &[T::T]) -> Result<()>;
 
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index c1c8db955..4f15c9f4b 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -159,7 +159,7 @@ impl<W: Write> Debug for SerializedFileWriter<W> {
     }
 }
 
-impl<W: Write> SerializedFileWriter<W> {
+impl<W: Write + Send> SerializedFileWriter<W> {
     /// Creates new file writer.
     pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> 
Result<Self> {
         let mut buf = TrackedWrite::new(buf);
@@ -405,7 +405,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
     on_close: Option<OnCloseRowGroup<'a>>,
 }
 
-impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
     /// Creates a new `SerializedRowGroupWriter` with:
     ///
     /// - `schema_descr` - the schema to write
@@ -699,7 +699,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
     }
 }
 
-impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
+impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
     fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
         let uncompressed_size = page.uncompressed_size();
         let compressed_size = page.compressed_size();
@@ -1332,7 +1332,7 @@ mod tests {
         compression: Compression,
     ) -> crate::format::FileMetaData
     where
-        W: Write,
+        W: Write + Send,
         R: ChunkReader + From<W> + 'static,
     {
         test_roundtrip::<W, R, Int32Type, _>(
@@ -1352,7 +1352,7 @@ mod tests {
         compression: Compression,
     ) -> crate::format::FileMetaData
     where
-        W: Write,
+        W: Write + Send,
         R: ChunkReader + From<W> + 'static,
         D: DataType,
         F: Fn(Row) -> D::T,
diff --git a/parquet/src/record/record_writer.rs 
b/parquet/src/record/record_writer.rs
index fe803a7ff..62099051f 100644
--- a/parquet/src/record/record_writer.rs
+++ b/parquet/src/record/record_writer.rs
@@ -21,7 +21,7 @@ use super::super::errors::ParquetError;
 use super::super::file::writer::SerializedRowGroupWriter;
 
 pub trait RecordWriter<T> {
-    fn write_to_row_group<W: std::io::Write>(
+    fn write_to_row_group<W: std::io::Write + Send>(
         &self,
         row_group_writer: &mut SerializedRowGroupWriter<W>,
     ) -> Result<(), ParquetError>;
diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs
index a09b3b652..0f875401f 100644
--- a/parquet_derive/src/lib.rs
+++ b/parquet_derive/src/lib.rs
@@ -96,7 +96,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) 
-> proc_macro::Toke
 
     (quote! {
     impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for 
&[#derived_for #generics] {
-      fn write_to_row_group<W: ::std::io::Write>(
+      fn write_to_row_group<W: ::std::io::Write + Send>(
         &self,
         row_group_writer: &mut 
::parquet::file::writer::SerializedRowGroupWriter<'_, W>
       ) -> Result<(), ::parquet::errors::ParquetError> {
diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs
index d2cf9efb1..f4f8be1e0 100644
--- a/parquet_derive_test/src/lib.rs
+++ b/parquet_derive_test/src/lib.rs
@@ -56,8 +56,7 @@ mod tests {
     use std::{env, fs, io::Write, sync::Arc};
 
     use parquet::{
-        file::{properties::WriterProperties, writer::SerializedFileWriter},
-        record::RecordWriter,
+        file::writer::SerializedFileWriter, record::RecordWriter,
         schema::parser::parse_message_type,
     };
 

Reply via email to