This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 28ca6d1ad9 Minor: name some constant values in arrow writer, parquet
writer (#8642)
28ca6d1ad9 is described below
commit 28ca6d1ad9692d0f159ed1f1f45a20c0998a47ea
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Dec 27 10:08:39 2023 -0500
Minor: name some constant values in arrow writer, parquet writer (#8642)
* Minor: name some constant values in arrow writer
* Add constants to parquet.rs, update doc comments
* fix
---
datafusion/core/src/datasource/file_format/arrow.rs | 13 ++++++++++---
datafusion/core/src/datasource/file_format/avro.rs | 2 +-
datafusion/core/src/datasource/file_format/csv.rs | 2 +-
datafusion/core/src/datasource/file_format/json.rs | 2 +-
datafusion/core/src/datasource/file_format/parquet.rs | 19 +++++++++++++++----
5 files changed, 28 insertions(+), 10 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs
b/datafusion/core/src/datasource/file_format/arrow.rs
index 7d393d9129..650f8c844e 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Apache Arrow format abstractions
+//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions
//!
//! Works with files following the [Arrow IPC
format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
@@ -58,6 +58,13 @@ use super::file_compression_type::FileCompressionType;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
+/// Initial writing buffer size. Note this is just a size hint for efficiency.
It
+/// will grow beyond the set value if needed.
+const INITIAL_BUFFER_BYTES: usize = 1048576;
+
+/// If the buffered Arrow data exceeds this size, it is flushed to object store
+const BUFFER_FLUSH_BYTES: usize = 1024000;
+
/// Arrow `FileFormat` implementation.
#[derive(Default, Debug)]
pub struct ArrowFormat;
@@ -239,7 +246,7 @@ impl DataSink for ArrowFileSink {
IpcWriteOptions::try_new(64, false,
arrow_ipc::MetadataVersion::V5)?
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
while let Some((path, mut rx)) = file_stream_rx.recv().await {
- let shared_buffer = SharedBuffer::new(1048576);
+ let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let mut arrow_writer =
arrow_ipc::writer::FileWriter::try_new_with_options(
shared_buffer.clone(),
&self.get_writer_schema(),
@@ -257,7 +264,7 @@ impl DataSink for ArrowFileSink {
row_count += batch.num_rows();
arrow_writer.write(&batch)?;
let mut buff_to_flush =
shared_buffer.buffer.try_lock().unwrap();
- if buff_to_flush.len() > 1024000 {
+ if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
diff --git a/datafusion/core/src/datasource/file_format/avro.rs
b/datafusion/core/src/datasource/file_format/avro.rs
index a24a28ad6f..6d424bf0b2 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Apache Avro format abstractions
+//! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions
use std::any::Any;
use std::sync::Arc;
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index df6689af6b..4033bcd3b5 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! CSV format abstractions
+//! [`CsvFormat`], Comma Separated Value (CSV) [`FileFormat`] abstractions
use std::any::Any;
use std::collections::HashSet;
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 9893a1db45..fcb1d5f8e5 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Line delimited JSON format abstractions
+//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions
use std::any::Any;
use std::fmt;
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 0c813b6ccb..7044acccd6 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Parquet format abstractions
+//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
use arrow_array::RecordBatch;
use async_trait::async_trait;
@@ -75,6 +75,17 @@ use crate::physical_plan::{
Statistics,
};
+/// Size of the buffer for [`AsyncArrowWriter`].
+const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760;
+
+/// Initial writing buffer size. Note this is just a size hint for efficiency.
It
+/// will grow beyond the set value if needed.
+const INITIAL_BUFFER_BYTES: usize = 1048576;
+
+/// When writing parquet files in parallel, if the buffered Parquet data
exceeds
+/// this size, it is flushed to object store
+const BUFFER_FLUSH_BYTES: usize = 1024000;
+
/// The Apache Parquet `FileFormat` implementation
///
/// Note it is recommended these are instead configured on the
[`ConfigOptions`]
@@ -680,7 +691,7 @@ impl ParquetSink {
let writer = AsyncArrowWriter::try_new(
multipart_writer,
self.get_writer_schema(),
- 10485760,
+ PARQUET_WRITER_BUFFER_SIZE,
Some(parquet_props),
)?;
Ok(writer)
@@ -1004,7 +1015,7 @@ async fn concatenate_parallel_row_groups(
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send +
Unpin>>,
) -> Result<usize> {
- let merged_buff = SharedBuffer::new(1048576);
+ let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
let mut parquet_writer = SerializedFileWriter::new(
@@ -1025,7 +1036,7 @@ async fn concatenate_parallel_row_groups(
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
let mut buff_to_flush =
merged_buff.buffer.try_lock().unwrap();
- if buff_to_flush.len() > 1024000 {
+ if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;