This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new aa799f0d0 Make GenericColumnWriter Send (#4287)
aa799f0d0 is described below
commit aa799f0d03c42b59d5accacf87b6bda4cd36ceae
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri May 26 23:04:53 2023 +0100
Make GenericColumnWriter Send (#4287)
---
parquet/src/arrow/arrow_writer/mod.rs | 6 +++---
parquet/src/column/page.rs | 2 +-
parquet/src/column/writer/mod.rs | 6 ++++++
parquet/src/encodings/encoding/mod.rs | 2 +-
parquet/src/file/writer.rs | 10 +++++-----
parquet/src/record/record_writer.rs | 2 +-
parquet_derive/src/lib.rs | 2 +-
parquet_derive_test/src/lib.rs | 3 +--
8 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 08cfc7ea3..616968bf6 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -117,7 +117,7 @@ impl<W: Write> Debug for ArrowWriter<W> {
}
}
-impl<W: Write> ArrowWriter<W> {
+impl<W: Write + Send> ArrowWriter<W> {
/// Try to create a new Arrow writer
///
/// The writer will fail if:
@@ -273,7 +273,7 @@ impl<W: Write> ArrowWriter<W> {
}
}
-impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
+impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
self.write(batch).map_err(|e| e.into())
}
@@ -284,7 +284,7 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
}
}
-fn write_leaves<W: Write>(
+fn write_leaves<W: Write + Send>(
row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
arrays: &[ArrayRef],
levels: &mut [Vec<LevelInfo>],
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index bd3568d13..f854e5cac 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -248,7 +248,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
///
/// It is reasonable to assume that all pages will be written in the correct
order, e.g.
/// dictionary page followed by data pages, or a set of data pages, etc.
-pub trait PageWriter {
+pub trait PageWriter: Send {
/// Writes a page into the output stream/sink.
/// Returns `PageWriteSpec` that contains information about written page
metrics,
/// including number of bytes, size, number of values, offset, etc.
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3fcfe6c19..bf77b2b32 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2174,6 +2174,12 @@ mod tests {
);
}
+ #[test]
+ fn test_send() {
+ fn test<T: Send>() {}
+ test::<ColumnWriterImpl<Int32Type>>();
+ }
+
/// Performs write-read roundtrip with randomly generated values and
levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` >
0) to write
/// for a column.
diff --git a/parquet/src/encodings/encoding/mod.rs
b/parquet/src/encodings/encoding/mod.rs
index b7e30c4ec..3088f3321 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -40,7 +40,7 @@ mod dict_encoder;
///
/// Currently this allocates internal buffers for the encoded values. After
done putting
/// values, caller should call `flush_buffer()` to get an immutable buffer
pointer.
-pub trait Encoder<T: DataType> {
+pub trait Encoder<T: DataType>: Send {
/// Encodes data from `values`.
fn put(&mut self, values: &[T::T]) -> Result<()>;
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index c1c8db955..4f15c9f4b 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -159,7 +159,7 @@ impl<W: Write> Debug for SerializedFileWriter<W> {
}
}
-impl<W: Write> SerializedFileWriter<W> {
+impl<W: Write + Send> SerializedFileWriter<W> {
/// Creates new file writer.
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) ->
Result<Self> {
let mut buf = TrackedWrite::new(buf);
@@ -405,7 +405,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
on_close: Option<OnCloseRowGroup<'a>>,
}
-impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
/// Creates a new `SerializedRowGroupWriter` with:
///
/// - `schema_descr` - the schema to write
@@ -699,7 +699,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
}
}
-impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
+impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
let uncompressed_size = page.uncompressed_size();
let compressed_size = page.compressed_size();
@@ -1332,7 +1332,7 @@ mod tests {
compression: Compression,
) -> crate::format::FileMetaData
where
- W: Write,
+ W: Write + Send,
R: ChunkReader + From<W> + 'static,
{
test_roundtrip::<W, R, Int32Type, _>(
@@ -1352,7 +1352,7 @@ mod tests {
compression: Compression,
) -> crate::format::FileMetaData
where
- W: Write,
+ W: Write + Send,
R: ChunkReader + From<W> + 'static,
D: DataType,
F: Fn(Row) -> D::T,
diff --git a/parquet/src/record/record_writer.rs
b/parquet/src/record/record_writer.rs
index fe803a7ff..62099051f 100644
--- a/parquet/src/record/record_writer.rs
+++ b/parquet/src/record/record_writer.rs
@@ -21,7 +21,7 @@ use super::super::errors::ParquetError;
use super::super::file::writer::SerializedRowGroupWriter;
pub trait RecordWriter<T> {
- fn write_to_row_group<W: std::io::Write>(
+ fn write_to_row_group<W: std::io::Write + Send>(
&self,
row_group_writer: &mut SerializedRowGroupWriter<W>,
) -> Result<(), ParquetError>;
diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs
index a09b3b652..0f875401f 100644
--- a/parquet_derive/src/lib.rs
+++ b/parquet_derive/src/lib.rs
@@ -96,7 +96,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream)
-> proc_macro::Toke
(quote! {
impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for
&[#derived_for #generics] {
- fn write_to_row_group<W: ::std::io::Write>(
+ fn write_to_row_group<W: ::std::io::Write + Send>(
&self,
row_group_writer: &mut
::parquet::file::writer::SerializedRowGroupWriter<'_, W>
) -> Result<(), ::parquet::errors::ParquetError> {
diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs
index d2cf9efb1..f4f8be1e0 100644
--- a/parquet_derive_test/src/lib.rs
+++ b/parquet_derive_test/src/lib.rs
@@ -56,8 +56,7 @@ mod tests {
use std::{env, fs, io::Write, sync::Arc};
use parquet::{
- file::{properties::WriterProperties, writer::SerializedFileWriter},
- record::RecordWriter,
+ file::writer::SerializedFileWriter, record::RecordWriter,
schema::parser::parse_message_type,
};