This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 28ca6d1ad9 Minor: name some constant values in arrow writer, parquet 
writer (#8642)
28ca6d1ad9 is described below

commit 28ca6d1ad9692d0f159ed1f1f45a20c0998a47ea
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Dec 27 10:08:39 2023 -0500

    Minor: name some constant values in arrow writer, parquet writer (#8642)
    
    * Minor: name some constant values in arrow writer
    
    * Add constants to parquet.rs, update doc comments
    
    * fix
---
 datafusion/core/src/datasource/file_format/arrow.rs   | 13 ++++++++++---
 datafusion/core/src/datasource/file_format/avro.rs    |  2 +-
 datafusion/core/src/datasource/file_format/csv.rs     |  2 +-
 datafusion/core/src/datasource/file_format/json.rs    |  2 +-
 datafusion/core/src/datasource/file_format/parquet.rs | 19 +++++++++++++++----
 5 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index 7d393d9129..650f8c844e 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Apache Arrow format abstractions
+//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions
 //!
 //! Works with files following the [Arrow IPC 
format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
 
@@ -58,6 +58,13 @@ use super::file_compression_type::FileCompressionType;
 use super::write::demux::start_demuxer_task;
 use super::write::{create_writer, SharedBuffer};
 
+/// Initial writing buffer size. Note this is just a size hint for efficiency. 
It
+/// will grow beyond the set value if needed.
+const INITIAL_BUFFER_BYTES: usize = 1048576;
+
+/// If the buffered Arrow data exceeds this size, it is flushed to object store
+const BUFFER_FLUSH_BYTES: usize = 1024000;
+
 /// Arrow `FileFormat` implementation.
 #[derive(Default, Debug)]
 pub struct ArrowFormat;
@@ -239,7 +246,7 @@ impl DataSink for ArrowFileSink {
             IpcWriteOptions::try_new(64, false, 
arrow_ipc::MetadataVersion::V5)?
                 .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
         while let Some((path, mut rx)) = file_stream_rx.recv().await {
-            let shared_buffer = SharedBuffer::new(1048576);
+            let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES);
             let mut arrow_writer = 
arrow_ipc::writer::FileWriter::try_new_with_options(
                 shared_buffer.clone(),
                 &self.get_writer_schema(),
@@ -257,7 +264,7 @@ impl DataSink for ArrowFileSink {
                     row_count += batch.num_rows();
                     arrow_writer.write(&batch)?;
                     let mut buff_to_flush = 
shared_buffer.buffer.try_lock().unwrap();
-                    if buff_to_flush.len() > 1024000 {
+                    if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
                         object_store_writer
                             .write_all(buff_to_flush.as_slice())
                             .await?;
diff --git a/datafusion/core/src/datasource/file_format/avro.rs 
b/datafusion/core/src/datasource/file_format/avro.rs
index a24a28ad6f..6d424bf0b2 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Apache Avro format abstractions
+//! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions
 
 use std::any::Any;
 use std::sync::Arc;
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index df6689af6b..4033bcd3b5 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! CSV format abstractions
+//! [`CsvFormat`], Comma Separated Value (CSV) [`FileFormat`] abstractions
 
 use std::any::Any;
 use std::collections::HashSet;
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 9893a1db45..fcb1d5f8e5 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Line delimited JSON format abstractions
+//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions
 
 use std::any::Any;
 use std::fmt;
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 0c813b6ccb..7044acccd6 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Parquet format abstractions
+//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
 
 use arrow_array::RecordBatch;
 use async_trait::async_trait;
@@ -75,6 +75,17 @@ use crate::physical_plan::{
     Statistics,
 };
 
+/// Size of the buffer for [`AsyncArrowWriter`].
+const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760;
+
+/// Initial writing buffer size. Note this is just a size hint for efficiency. 
It
+/// will grow beyond the set value if needed.
+const INITIAL_BUFFER_BYTES: usize = 1048576;
+
+/// When writing parquet files in parallel, if the buffered Parquet data 
exceeds
+/// this size, it is flushed to object store
+const BUFFER_FLUSH_BYTES: usize = 1024000;
+
 /// The Apache Parquet `FileFormat` implementation
 ///
 /// Note it is recommended these are instead configured on the 
[`ConfigOptions`]
@@ -680,7 +691,7 @@ impl ParquetSink {
         let writer = AsyncArrowWriter::try_new(
             multipart_writer,
             self.get_writer_schema(),
-            10485760,
+            PARQUET_WRITER_BUFFER_SIZE,
             Some(parquet_props),
         )?;
         Ok(writer)
@@ -1004,7 +1015,7 @@ async fn concatenate_parallel_row_groups(
     writer_props: Arc<WriterProperties>,
     mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + 
Unpin>>,
 ) -> Result<usize> {
-    let merged_buff = SharedBuffer::new(1048576);
+    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
 
     let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
     let mut parquet_writer = SerializedFileWriter::new(
@@ -1025,7 +1036,7 @@ async fn concatenate_parallel_row_groups(
                 for chunk in serialized_columns {
                     chunk.append_to_row_group(&mut rg_out)?;
                     let mut buff_to_flush = 
merged_buff.buffer.try_lock().unwrap();
-                    if buff_to_flush.len() > 1024000 {
+                    if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
                         object_store_writer
                             .write_all(buff_to_flush.as_slice())
                             .await?;

Reply via email to