This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7c6fdcc683 DataSink Dynamic Execution Time Demux (#7791)
7c6fdcc683 is described below
commit 7c6fdcc6839f06bd0f7981bdcd45b01200a41db3
Author: Devin D'Angelo <[email protected]>
AuthorDate: Tue Oct 17 20:39:26 2023 -0400
DataSink Dynamic Execution Time Demux (#7791)
* dynamic partition
* linting
* update docs
* fix all tests
* cargo doc fix
* test correct number of files
* add asci art and reduce buffer size
* fix config tests
* review comments
* cleanup post rebase
* fix test
---
datafusion/common/src/config.rs | 18 +
datafusion/core/src/datasource/file_format/csv.rs | 192 ++++----
datafusion/core/src/datasource/file_format/json.rs | 174 +++----
.../core/src/datasource/file_format/parquet.rs | 251 ++++------
.../core/src/datasource/file_format/write.rs | 515 ++++++++++++++++-----
datafusion/core/src/datasource/listing/table.rs | 22 +-
datafusion/core/src/datasource/memory.rs | 14 +-
.../core/src/datasource/physical_plan/parquet.rs | 19 +-
datafusion/physical-plan/src/insert.rs | 30 +-
.../sqllogictest/test_files/information_schema.slt | 6 +
datafusion/sqllogictest/test_files/insert.slt | 15 +-
.../sqllogictest/test_files/insert_to_external.slt | 22 +-
docs/source/user-guide/configs.md | 3 +
13 files changed, 720 insertions(+), 561 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 281da1f69e..86e6221c3b 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -254,6 +254,24 @@ config_namespace! {
/// Number of files to read in parallel when inferring schema and
statistics
pub meta_fetch_concurrency: usize, default = 32
+
+ /// Target number of rows in output files when writing multiple.
+ /// This is a soft max, so it can be exceeded slightly. There also
+ /// will be one file smaller than the limit if the total
+ /// number of rows written is not roughly divisible by the soft max
+ pub soft_max_rows_per_output_file: usize, default = 50000000
+
+ /// This is the maximum number of output files being written
+ /// in parallel. Higher values can potentially give faster write
+ /// performance at the cost of higher peak memory consumption.
+ pub max_parallel_ouput_files: usize, default = 8
+
+ /// This is the maximum number of RecordBatches buffered
+ /// for each output file being worked. Higher values can potentially
+ /// give faster write performance at the cost of higher peak
+ /// memory consumption
+ pub max_buffered_batches_per_output_file: usize, default = 2
+
}
}
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 41265ede7f..bc01b29ba0 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -23,11 +23,10 @@ use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
+use super::write::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::file_format::write::{
- create_writer, stateless_serialize_and_write_files, BatchSerializer,
FileWriterMode,
-};
+use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
@@ -51,7 +50,6 @@ use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
-use rand::distributions::{Alphanumeric, DistString};
/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
@@ -481,6 +479,82 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}
+
+ async fn append_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let writer_options =
self.config.file_type_writer_options.try_into_csv()?;
+ let (builder, compression) =
+ (&writer_options.writer_options, &writer_options.compression);
+ let compression = FileCompressionType::from(*compression);
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+ let file_groups = &self.config.file_groups;
+
+ let builder_clone = builder.clone();
+ let options_clone = writer_options.clone();
+ let get_serializer = move |file_size| {
+ let inner_clone = builder_clone.clone();
+ // In append mode, consider has_header flag only when file is
empty (at the start).
+ // For other modes, use has_header flag as is.
+ let serializer: Box<dyn BatchSerializer> = Box::new(if file_size >
0 {
+ CsvSerializer::new()
+ .with_builder(inner_clone)
+ .with_header(false)
+ } else {
+ CsvSerializer::new()
+ .with_builder(inner_clone)
+ .with_header(options_clone.has_header)
+ });
+ serializer
+ };
+
+ stateless_append_all(
+ data,
+ context,
+ object_store,
+ file_groups,
+ self.config.unbounded_input,
+ compression,
+ Box::new(get_serializer),
+ )
+ .await
+ }
+
+ async fn multipartput_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let writer_options =
self.config.file_type_writer_options.try_into_csv()?;
+ let builder = &writer_options.writer_options;
+
+ let builder_clone = builder.clone();
+ let options_clone = writer_options.clone();
+ let get_serializer = move || {
+ let inner_clone = builder_clone.clone();
+ let serializer: Box<dyn BatchSerializer> = Box::new(
+ CsvSerializer::new()
+ .with_builder(inner_clone)
+ .with_header(options_clone.has_header),
+ );
+ serializer
+ };
+
+ stateless_multipart_put(
+ data,
+ context,
+ "csv".into(),
+ Box::new(get_serializer),
+ &self.config,
+ writer_options.compression.into(),
+ )
+ .await
+ }
}
#[async_trait]
@@ -495,116 +569,22 @@ impl DataSink for CsvSink {
async fn write_all(
&self,
- data: Vec<SendableRecordBatchStream>,
+ data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
- let num_partitions = data.len();
- let writer_options =
self.config.file_type_writer_options.try_into_csv()?;
- let (builder, compression) =
- (&writer_options.writer_options, &writer_options.compression);
- let mut has_header = writer_options.has_header;
- let compression = FileCompressionType::from(*compression);
-
- let object_store = context
- .runtime_env()
- .object_store(&self.config.object_store_url)?;
- // Construct serializer and writer for each file group
- let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
- let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
- for file_group in &self.config.file_groups {
- let mut append_builder = builder.clone();
- // In append mode, consider has_header flag only when file
is empty (at the start).
- // For other modes, use has_header flag as is.
- if file_group.object_meta.size != 0 {
- has_header = false;
- append_builder = append_builder.has_headers(false);
- }
- let serializer = CsvSerializer::new()
- .with_builder(append_builder)
- .with_header(has_header);
- serializers.push(Box::new(serializer));
-
- let file = file_group.clone();
- let writer = create_writer(
- self.config.writer_mode,
- compression,
- file.object_meta.clone().into(),
- object_store.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
- FileWriterMode::Put => {
- return not_impl_err!("Put Mode is not implemented for CSV Sink
yet")
+ let total_count = self.append_all(data, context).await?;
+ Ok(total_count)
}
FileWriterMode::PutMultipart => {
- // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
- let base_path = &self.config.table_paths[0];
- match self.config.single_file_output {
- false => {
- // Uniquely identify this batch of files with a random
string, to prevent collisions overwriting files
- let write_id =
- Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
- for part_idx in 0..num_partitions {
- let serializer = CsvSerializer::new()
- .with_builder(builder.clone())
- .with_header(has_header);
- serializers.push(Box::new(serializer));
- let file_path = base_path
- .prefix()
- .child(format!("{}_{}.csv", write_id,
part_idx));
- let object_meta = ObjectMeta {
- location: file_path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = create_writer(
- self.config.writer_mode,
- compression,
- object_meta.into(),
- object_store.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
- true => {
- let serializer = CsvSerializer::new()
- .with_builder(builder.clone())
- .with_header(has_header);
- serializers.push(Box::new(serializer));
- let file_path = base_path.prefix();
- let object_meta = ObjectMeta {
- location: file_path.clone(),
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = create_writer(
- self.config.writer_mode,
- compression,
- object_meta.into(),
- object_store.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
+ let total_count = self.multipartput_all(data, context).await?;
+ Ok(total_count)
+ }
+ FileWriterMode::Put => {
+ return not_impl_err!("FileWriterMode::Put is not supported
yet!")
}
}
-
- stateless_serialize_and_write_files(
- data,
- serializers,
- writers,
- self.config.single_file_output,
- self.config.unbounded_input,
- )
- .await
}
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index dc5b24b2ea..e1f8ab0d57 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -23,11 +23,10 @@ use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;
+use super::write::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, FileScanConfig};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::file_format::write::{
- create_writer, stateless_serialize_and_write_files, BatchSerializer,
FileWriterMode,
-};
+use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig,
NdJsonExec};
use crate::error::Result;
@@ -49,7 +48,6 @@ use datafusion_physical_plan::metrics::MetricsSet;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
-use rand::distributions::{Alphanumeric, DistString};
/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug)]
@@ -172,7 +170,7 @@ impl FileFormat for JsonFormat {
return not_impl_err!("Inserting compressed JSON is not implemented
yet.");
}
let sink_schema = conf.output_schema().clone();
- let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
+ let sink = Arc::new(JsonSink::new(conf));
Ok(Arc::new(FileSinkExec::new(
input,
@@ -226,14 +224,11 @@ impl BatchSerializer for JsonSerializer {
struct JsonSink {
/// Config options for writing data
config: FileSinkConfig,
- file_compression_type: FileCompressionType,
}
impl Debug for JsonSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("JsonSink")
- .field("file_compression_type", &self.file_compression_type)
- .finish()
+ f.debug_struct("JsonSink").finish()
}
}
@@ -254,11 +249,62 @@ impl DisplayAs for JsonSink {
}
impl JsonSink {
- fn new(config: FileSinkConfig, file_compression_type: FileCompressionType)
-> Self {
- Self {
- config,
- file_compression_type,
- }
+ fn new(config: FileSinkConfig) -> Self {
+ Self { config }
+ }
+
+ async fn append_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let writer_options =
self.config.file_type_writer_options.try_into_json()?;
+ let compression = &writer_options.compression;
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+ let file_groups = &self.config.file_groups;
+
+ let get_serializer = move |_| {
+ let serializer: Box<dyn BatchSerializer> =
Box::new(JsonSerializer::new());
+ serializer
+ };
+
+ stateless_append_all(
+ data,
+ context,
+ object_store,
+ file_groups,
+ self.config.unbounded_input,
+ (*compression).into(),
+ Box::new(get_serializer),
+ )
+ .await
+ }
+
+ async fn multipartput_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let writer_options =
self.config.file_type_writer_options.try_into_json()?;
+ let compression = &writer_options.compression;
+
+ let get_serializer = move || {
+ let serializer: Box<dyn BatchSerializer> =
Box::new(JsonSerializer::new());
+ serializer
+ };
+
+ stateless_multipart_put(
+ data,
+ context,
+ "json".into(),
+ Box::new(get_serializer),
+ &self.config,
+ (*compression).into(),
+ )
+ .await
}
}
@@ -274,106 +320,22 @@ impl DataSink for JsonSink {
async fn write_all(
&self,
- data: Vec<SendableRecordBatchStream>,
+ data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
- let num_partitions = data.len();
-
- let object_store = context
- .runtime_env()
- .object_store(&self.config.object_store_url)?;
-
- let writer_options =
self.config.file_type_writer_options.try_into_json()?;
-
- let compression =
FileCompressionType::from(writer_options.compression);
-
- // Construct serializer and writer for each file group
- let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
- let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
- if self.config.single_file_output {
- return
Err(DataFusionError::NotImplemented("single_file_output=true is not implemented
for JsonSink in Append mode".into()));
- }
- for file_group in &self.config.file_groups {
- let serializer = JsonSerializer::new();
- serializers.push(Box::new(serializer));
-
- let file = file_group.clone();
- let writer = create_writer(
- self.config.writer_mode,
- compression,
- file.object_meta.clone().into(),
- object_store.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
- FileWriterMode::Put => {
- return not_impl_err!("Put Mode is not implemented for Json
Sink yet")
+ let total_count = self.append_all(data, context).await?;
+ Ok(total_count)
}
FileWriterMode::PutMultipart => {
- // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
- let base_path = &self.config.table_paths[0];
- match self.config.single_file_output {
- false => {
- // Uniquely identify this batch of files with a random
string, to prevent collisions overwriting files
- let write_id =
- Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
- for part_idx in 0..num_partitions {
- let serializer = JsonSerializer::new();
- serializers.push(Box::new(serializer));
- let file_path = base_path
- .prefix()
- .child(format!("{}_{}.json", write_id,
part_idx));
- let object_meta = ObjectMeta {
- location: file_path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = create_writer(
- self.config.writer_mode,
- compression,
- object_meta.into(),
- object_store.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
- true => {
- let serializer = JsonSerializer::new();
- serializers.push(Box::new(serializer));
- let file_path = base_path.prefix();
- let object_meta = ObjectMeta {
- location: file_path.clone(),
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = create_writer(
- self.config.writer_mode,
- compression,
- object_meta.into(),
- object_store.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
+ let total_count = self.multipartput_all(data, context).await?;
+ Ok(total_count)
+ }
+ FileWriterMode::Put => {
+ return not_impl_err!("FileWriterMode::Put is not supported
yet!")
}
}
-
- stateless_serialize_and_write_files(
- data,
- serializers,
- writers,
- self.config.single_file_output,
- self.config.unbounded_input,
- )
- .await
}
}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 859bff7ae4..12d5d515bb 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -23,7 +23,7 @@ use std::fmt::Debug;
use std::io::Write;
use std::sync::Arc;
-use super::write::{create_writer, AbortableWrite, FileWriterMode};
+use super::write::{create_writer, start_demuxer_task, AbortableWrite,
FileWriterMode};
use super::{FileFormat, FileScanConfig};
use crate::config::ConfigOptions;
@@ -62,7 +62,6 @@ use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
-use rand::distributions::{Alphanumeric, DistString};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::task::{JoinHandle, JoinSet};
@@ -641,79 +640,6 @@ impl ParquetSink {
}
}
- /// Creates an AsyncArrowWriter for each partition to be written out
- /// AsyncArrowWriters are used when individual parquet file serialization
is not parallelized
- async fn create_all_async_arrow_writers(
- &self,
- num_partitions: usize,
- parquet_props: &WriterProperties,
- object_store: Arc<dyn ObjectStore>,
- ) -> Result<
- Vec<AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send
+ Unpin>>>,
- > {
- // Construct writer for each file group
- let mut writers = vec![];
- match self.config.writer_mode {
- FileWriterMode::Append => {
- return plan_err!(
- "Parquet format does not support appending to existing
file!"
- )
- }
- FileWriterMode::Put => {
- return not_impl_err!("Put Mode is not implemented for
ParquetSink yet")
- }
- FileWriterMode::PutMultipart => {
- // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
- let base_path = &self.config.table_paths[0];
- match self.config.single_file_output {
- false => {
- // Uniquely identify this batch of files with a random
string, to prevent collisions overwriting files
- let write_id =
- Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
- for part_idx in 0..num_partitions {
- let file_path = base_path
- .prefix()
- .child(format!("{}_{}.parquet", write_id,
part_idx));
- let object_meta = ObjectMeta {
- location: file_path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = self
- .create_async_arrow_writer(
- object_meta.into(),
- object_store.clone(),
- parquet_props.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
- true => {
- let file_path = base_path.prefix();
- let object_meta = ObjectMeta {
- location: file_path.clone(),
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = self
- .create_async_arrow_writer(
- object_meta.into(),
- object_store.clone(),
- parquet_props.clone(),
- )
- .await?;
- writers.push(writer);
- }
- }
- }
- }
-
- Ok(writers)
- }
-
/// Creates an object store writer for each output partition
/// This is used when parallelizing individual parquet file writes.
async fn create_object_store_writers(
@@ -758,10 +684,9 @@ impl DataSink for ParquetSink {
async fn write_all(
&self,
- mut data: Vec<SendableRecordBatchStream>,
+ data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
- let num_partitions = data.len();
let parquet_props = self
.config
.file_type_writer_options
@@ -772,63 +697,93 @@ impl DataSink for ParquetSink {
.runtime_env()
.object_store(&self.config.object_store_url)?;
- let mut row_count = 0;
+ let exec_options = &context.session_config().options().execution;
+
+ let allow_single_file_parallelism =
+ exec_options.parquet.allow_single_file_parallelism;
+
+ // This is a temporary special case until
https://github.com/apache/arrow-datafusion/pull/7655
+ // can be pulled in.
+ if allow_single_file_parallelism && self.config.single_file_output {
+ let object_store_writer = self
+ .create_object_store_writers(1, object_store)
+ .await?
+ .remove(0);
+
+ let schema_clone = self.config.output_schema.clone();
+ return output_single_parquet_file_parallelized(
+ object_store_writer,
+ vec![data],
+ schema_clone,
+ parquet_props,
+ )
+ .await
+ .map(|r| r as u64);
+ }
- let allow_single_file_parallelism = context
- .session_config()
- .options()
- .execution
- .parquet
- .allow_single_file_parallelism;
-
- match self.config.single_file_output {
- false => {
- let writers = self
- .create_all_async_arrow_writers(
- num_partitions,
- parquet_props,
- object_store.clone(),
- )
- .await?;
- // TODO parallelize individual parquet serialization when
already outputting multiple parquet files
- // e.g. if outputting 2 parquet files on a system with 32
threads, spawn 16 tasks for each individual
- // file to be serialized.
- row_count = output_multiple_parquet_files(writers,
data).await?;
- }
- true => {
- if !allow_single_file_parallelism || data.len() <= 1 {
- let mut writer = self
- .create_all_async_arrow_writers(
- num_partitions,
- parquet_props,
- object_store.clone(),
- )
- .await?
- .remove(0);
- for data_stream in data.iter_mut() {
- while let Some(batch) =
data_stream.next().await.transpose()? {
- row_count += batch.num_rows();
- writer.write(&batch).await?;
- }
+ let (demux_task, mut file_stream_rx) = start_demuxer_task(
+ data,
+ context,
+ None,
+ self.config.table_paths[0].clone(),
+ "parquet".into(),
+ self.config.single_file_output,
+ );
+
+ let mut file_write_tasks: JoinSet<std::result::Result<usize,
DataFusionError>> =
+ JoinSet::new();
+ while let Some((path, mut rx)) = file_stream_rx.recv().await {
+ let mut writer = self
+ .create_async_arrow_writer(
+ ObjectMeta {
+ location: path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
}
+ .into(),
+ object_store.clone(),
+ parquet_props.clone(),
+ )
+ .await?;
- writer.close().await?;
- } else {
- let object_store_writer = self
- .create_object_store_writers(1, object_store)
- .await?
- .remove(0);
- row_count = output_single_parquet_file_parallelized(
- object_store_writer,
- data,
- self.config.output_schema.clone(),
- parquet_props,
- )
- .await?;
+ file_write_tasks.spawn(async move {
+ let mut row_count = 0;
+ while let Some(batch) = rx.recv().await {
+ row_count += batch.num_rows();
+ writer.write(&batch).await?;
+ }
+ writer.close().await?;
+ Ok(row_count)
+ });
+ }
+
+ let mut row_count = 0;
+ while let Some(result) = file_write_tasks.join_next().await {
+ match result {
+ Ok(r) => {
+ row_count += r?;
+ }
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
}
}
}
+ match demux_task.await {
+ Ok(r) => r?,
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
+ }
+ }
Ok(row_count as u64)
}
}
@@ -974,48 +929,6 @@ async fn output_single_parquet_file_parallelized(
};
object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;
- println!("done!");
-
- Ok(row_count)
-}
-
-/// Serializes multiple parquet files independently in parallel from different
RecordBatch streams.
-/// AsyncArrowWriter is used to coordinate serialization and MultiPart puts to
ObjectStore
-/// Only a single CPU thread is used to serialize each individual parquet
file, so write speed and overall
-/// CPU utilization is dependent on the number of output files.
-async fn output_multiple_parquet_files(
- writers: Vec<
- AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send +
Unpin>>,
- >,
- data: Vec<SendableRecordBatchStream>,
-) -> Result<usize> {
- let mut row_count = 0;
- let mut join_set: JoinSet<Result<usize, DataFusionError>> = JoinSet::new();
- for (mut data_stream, mut writer) in
data.into_iter().zip(writers.into_iter()) {
- join_set.spawn(async move {
- let mut cnt = 0;
- while let Some(batch) = data_stream.next().await.transpose()? {
- cnt += batch.num_rows();
- writer.write(&batch).await?;
- }
- writer.close().await?;
- Ok(cnt)
- });
- }
- while let Some(result) = join_set.join_next().await {
- match result {
- Ok(res) => {
- row_count += res?;
- } // propagate DataFusion error
- Err(e) => {
- if e.is_panic() {
- std::panic::resume_unwind(e.into_panic());
- } else {
- unreachable!();
- }
- }
- }
- }
Ok(row_count)
}
diff --git a/datafusion/core/src/datasource/file_format/write.rs
b/datafusion/core/src/datasource/file_format/write.rs
index 42d18eef63..928d0d1ba5 100644
--- a/datafusion/core/src/datasource/file_format/write.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -25,24 +25,27 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::physical_plan::FileMeta;
+use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
+use crate::datasource::physical_plan::{FileMeta, FileSinkConfig};
use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::RecordBatch;
-use datafusion_common::{exec_err, internal_err, DataFusionError};
+use datafusion_common::{exec_err, DataFusionError};
use async_trait::async_trait;
use bytes::Bytes;
-use datafusion_execution::RecordBatchStream;
+use datafusion_execution::TaskContext;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{ready, StreamExt};
use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
+use rand::distributions::DistString;
use tokio::io::{AsyncWrite, AsyncWriteExt};
-use tokio::sync::mpsc;
+use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::{JoinHandle, JoinSet};
+use tokio::try_join;
/// `AsyncPutWriter` is an object that facilitates asynchronous writing to
object stores.
/// It is specifically designed for the `object_store` crate's `put` method
and sends
@@ -308,22 +311,20 @@ type SerializerType = Box<dyn BatchSerializer>;
/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
-async fn serialize_rb_stream_to_object_store(
- mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
+pub(crate) async fn serialize_rb_stream_to_object_store(
+ mut data_rx: Receiver<RecordBatch>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
unbounded_input: bool,
-) -> std::result::Result<(SerializerType, WriterType, u64), (WriterType,
DataFusionError)>
-{
+) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes),
DataFusionError>>>(100);
let serialize_task = tokio::spawn(async move {
- while let Some(maybe_batch) = data_stream.next().await {
+ while let Some(batch) = data_rx.recv().await {
match serializer.duplicate() {
Ok(mut serializer_clone) => {
let handle = tokio::spawn(async move {
- let batch = maybe_batch?;
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch).await?;
Ok((num_rows, bytes))
@@ -344,7 +345,7 @@ async fn serialize_rb_stream_to_object_store(
}
}
}
- Ok(serializer)
+ Ok(())
});
let mut row_count = 0;
@@ -380,8 +381,8 @@ async fn serialize_rb_stream_to_object_store(
}
}
- let serializer = match serialize_task.await {
- Ok(Ok(serializer)) => serializer,
+ match serialize_task.await {
+ Ok(Ok(_)) => (),
Ok(Err(e)) => return Err((writer, e)),
Err(_) => {
return Err((
@@ -390,29 +391,166 @@ async fn serialize_rb_stream_to_object_store(
))
}
};
- Ok((serializer, writer, row_count as u64))
+ Ok((writer, row_count as u64))
}
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput
parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the
unique
+/// values of a specific column
`<https://github.com/apache/arrow-datafusion/issues/7744>``
+///
┌───────────┐ ┌────────────┐ ┌─────────────┐
+///
┌──────▶ │ batch 1 ├────▶...──────▶│ Batch a │ │ Output File1│
+/// │
└───────────┘ └────────────┘ └─────────────┘
+/// │
+/// ┌──────────┐ │
┌───────────┐ ┌────────────┐ ┌─────────────┐
+/// ┌───────────┐ ┌────────────┐ │ │
├──────▶ │ batch a+1├────▶...──────▶│ Batch b │ │ Output File2│
+/// │ batch 1 ├────▶...──────▶│ Batch N ├─────▶│ Demux ├────────┤ ...
└───────────┘ └────────────┘ └─────────────┘
+/// └───────────┘ └────────────┘ │ │ │
+/// └──────────┘ │
┌───────────┐ ┌────────────┐ ┌─────────────┐
+///
└──────▶ │ batch d ├────▶...──────▶│ Batch n │ │ Output FileN│
+///
└───────────┘ └────────────┘ └─────────────┘
+pub(crate) fn start_demuxer_task(
+ input: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ _partition_by: Option<&str>,
+ base_output_path: ListingTableUrl,
+ file_extension: String,
+ single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+ let exec_options = &context.session_config().options().execution;
+
+ let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+ let max_parallel_files = exec_options.max_parallel_ouput_files;
+ let max_buffered_batches =
exec_options.max_buffered_batches_per_output_file;
+
+ let (tx, rx) = mpsc::channel(max_parallel_files);
+
+ let task = tokio::spawn(async move {
+ row_count_demuxer(
+ input,
+ base_output_path,
+ file_extension,
+ single_file_output,
+ max_rows_per_file,
+ max_buffered_batches,
+ tx,
+ )
+ .await
+ });
+ (task, rx)
+}
+
+fn generate_file_path(
+ base_output_path: &ListingTableUrl,
+ write_id: &str,
+ part_idx: usize,
+ file_extension: &str,
+ single_file_output: bool,
+) -> Path {
+ if !single_file_output {
+ base_output_path
+ .prefix()
+ .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+ } else {
+ base_output_path.prefix().to_owned()
+ }
+}
+
+async fn create_new_file_stream(
+ base_output_path: &ListingTableUrl,
+ write_id: &str,
+ part_idx: usize,
+ file_extension: &str,
+ single_file_output: bool,
+ max_buffered_batches: usize,
+ tx: &mut Sender<(Path, Receiver<RecordBatch>)>,
+) -> Result<Sender<RecordBatch>> {
+ let file_path = generate_file_path(
+ base_output_path,
+ write_id,
+ part_idx,
+ file_extension,
+ single_file_output,
+ );
+ let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
+ tx.send((file_path, rx_file)).await.map_err(|_| {
+ DataFusionError::Execution("Error sending RecordBatch to file
stream!".into())
+ })?;
+ Ok(tx_file)
+}
+
+async fn row_count_demuxer(
+ mut input: SendableRecordBatchStream,
+ base_output_path: ListingTableUrl,
+ file_extension: String,
+ single_file_output: bool,
+ max_rows_per_file: usize,
+ max_buffered_batches: usize,
+ mut tx: Sender<(Path, Receiver<RecordBatch>)>,
+) -> Result<()> {
+ let mut total_rows_current_file = 0;
+ let mut part_idx = 0;
+ let write_id =
+ rand::distributions::Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+
+ let mut tx_file = create_new_file_stream(
+ &base_output_path,
+ &write_id,
+ part_idx,
+ &file_extension,
+ single_file_output,
+ max_buffered_batches,
+ &mut tx,
+ )
+ .await?;
+ part_idx += 1;
+
+ while let Some(rb) = input.next().await.transpose()? {
+ total_rows_current_file += rb.num_rows();
+ tx_file.send(rb).await.map_err(|_| {
+ DataFusionError::Execution("Error sending RecordBatch to file
stream!".into())
+ })?;
+
+ if total_rows_current_file >= max_rows_per_file && !single_file_output
{
+ total_rows_current_file = 0;
+ tx_file = create_new_file_stream(
+ &base_output_path,
+ &write_id,
+ part_idx,
+ &file_extension,
+ single_file_output,
+ max_buffered_batches,
+ &mut tx,
+ )
+ .await?;
+ part_idx += 1;
+ }
+ }
+ Ok(())
+}
+
+type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
/// Contains the common logic for serializing RecordBatches and
/// writing the resulting bytes to an ObjectStore.
/// Serialization is assumed to be stateless, i.e.
/// each RecordBatch can be serialized without any
/// dependency on the RecordBatches before or after.
pub(crate) async fn stateless_serialize_and_write_files(
- data: Vec<SendableRecordBatchStream>,
- mut serializers: Vec<SerializerType>,
- mut writers: Vec<WriterType>,
- single_file_output: bool,
+ mut rx: Receiver<FileWriteBundle>,
+ tx: tokio::sync::oneshot::Sender<u64>,
unbounded_input: bool,
-) -> Result<u64> {
- if single_file_output && (serializers.len() != 1 || writers.len() != 1) {
- return internal_err!("single_file_output is true, but got more than 1
writer!");
- }
- let num_partitions = data.len();
- let num_writers = writers.len();
- if !single_file_output && (num_partitions != num_writers) {
- return internal_err!("single_file_ouput is false, but did not get 1
writer for each output partition!");
- }
+) -> Result<()> {
let mut row_count = 0;
// tracks if any writers encountered an error triggering the need to abort
let mut any_errors = false;
@@ -421,100 +559,58 @@ pub(crate) async fn stateless_serialize_and_write_files(
// tracks if any errors were encountered in the process of aborting
writers.
// if true, we may not have a guarentee that all written data was cleaned
up.
let mut any_abort_errors = false;
- match single_file_output {
- false => {
- let mut join_set = JoinSet::new();
- for (data_stream, serializer, writer) in data
- .into_iter()
- .zip(serializers.into_iter())
- .zip(writers.into_iter())
- .map(|((a, b), c)| (a, b, c))
- {
- join_set.spawn(async move {
- serialize_rb_stream_to_object_store(
- data_stream,
- serializer,
- writer,
- unbounded_input,
- )
- .await
- });
- }
- let mut finished_writers = Vec::with_capacity(num_writers);
- while let Some(result) = join_set.join_next().await {
- match result {
- Ok(res) => match res {
- Ok((_, writer, cnt)) => {
- finished_writers.push(writer);
- row_count += cnt;
- }
- Err((writer, e)) => {
- finished_writers.push(writer);
- any_errors = true;
- triggering_error = Some(e);
- }
- },
- Err(e) => {
- // Don't panic, instead try to clean up as many
writers as possible.
- // If we hit this code, ownership of a writer was not
joined back to
- // this thread, so we cannot clean it up (hence
any_abort_errors is true)
- any_errors = true;
- any_abort_errors = true;
- triggering_error =
Some(DataFusionError::Internal(format!(
- "Unexpected join error while serializing file {e}"
- )));
- }
+ let mut join_set = JoinSet::new();
+ while let Some((data_rx, serializer, writer)) = rx.recv().await {
+ join_set.spawn(async move {
+ serialize_rb_stream_to_object_store(
+ data_rx,
+ serializer,
+ writer,
+ unbounded_input,
+ )
+ .await
+ });
+ }
+ let mut finished_writers = Vec::new();
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok(res) => match res {
+ Ok((writer, cnt)) => {
+ finished_writers.push(writer);
+ row_count += cnt;
}
- }
-
- // Finalize or abort writers as appropriate
- for mut writer in finished_writers.into_iter() {
- match any_errors {
- true => {
- let abort_result = writer.abort_writer();
- if abort_result.is_err() {
- any_abort_errors = true;
- }
- }
- false => {
- writer.shutdown()
- .await
- .map_err(|_| DataFusionError::Internal("Error
encountered while finalizing writes! Partial results may have been written to
ObjectStore!".into()))?;
- }
+ Err((writer, e)) => {
+ finished_writers.push(writer);
+ any_errors = true;
+ triggering_error = Some(e);
}
+ },
+ Err(e) => {
+ // Don't panic, instead try to clean up as many writers as
possible.
+ // If we hit this code, ownership of a writer was not joined
back to
+ // this thread, so we cannot clean it up (hence
any_abort_errors is true)
+ any_errors = true;
+ any_abort_errors = true;
+ triggering_error = Some(DataFusionError::Internal(format!(
+ "Unexpected join error while serializing file {e}"
+ )));
}
}
- true => {
- let mut writer = writers.remove(0);
- let mut serializer = serializers.remove(0);
- let mut cnt;
- for data_stream in data.into_iter() {
- (serializer, writer, cnt) = match
serialize_rb_stream_to_object_store(
- data_stream,
- serializer,
- writer,
- unbounded_input,
- )
- .await
- {
- Ok((s, w, c)) => (s, w, c),
- Err((w, e)) => {
- any_errors = true;
- triggering_error = Some(e);
- writer = w;
- break;
- }
- };
- row_count += cnt;
- }
- match any_errors {
- true => {
- let abort_result = writer.abort_writer();
- if abort_result.is_err() {
- any_abort_errors = true;
- }
+ }
+
+ // Finalize or abort writers as appropriate
+ for mut writer in finished_writers.into_iter() {
+ match any_errors {
+ true => {
+ let abort_result = writer.abort_writer();
+ if abort_result.is_err() {
+ any_abort_errors = true;
}
- false => writer.shutdown().await?,
+ }
+ false => {
+ writer.shutdown()
+ .await
+ .map_err(|_| DataFusionError::Internal("Error encountered
while finalizing writes! Partial results may have been written to
ObjectStore!".into()))?;
}
}
}
@@ -529,5 +625,190 @@ pub(crate) async fn stateless_serialize_and_write_files(
}
}
- Ok(row_count)
+ tx.send(row_count).map_err(|_| {
+ DataFusionError::Internal(
+ "Error encountered while sending row count back to file
sink!".into(),
+ )
+ })?;
+ Ok(())
+}
+
+/// Orchestrates multipart put of a dynamic number of output files from a
single input stream
+/// for any statelessly serialized file type. That is, any file type for which
each [RecordBatch]
+/// can be serialized independently of all other [RecordBatch]s.
+pub(crate) async fn stateless_multipart_put(
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ file_extension: String,
+ get_serializer: Box<dyn Fn() -> Box<dyn BatchSerializer> + Send>,
+ config: &FileSinkConfig,
+ compression: FileCompressionType,
+) -> Result<u64> {
+ let object_store = context
+ .runtime_env()
+ .object_store(&config.object_store_url)?;
+
+ let single_file_output = config.single_file_output;
+ let base_output_path = &config.table_paths[0];
+ let unbounded_input = config.unbounded_input;
+
+ let (demux_task, mut file_stream_rx) = start_demuxer_task(
+ data,
+ context,
+ None,
+ base_output_path.clone(),
+ file_extension,
+ single_file_output,
+ );
+
+ let rb_buffer_size = &context
+ .session_config()
+ .options()
+ .execution
+ .max_buffered_batches_per_output_file;
+
+ let (tx_file_bundle, rx_file_bundle) =
tokio::sync::mpsc::channel(rb_buffer_size / 2);
+ let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel();
+ let write_coordinater_task = tokio::spawn(async move {
+ stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt,
unbounded_input)
+ .await
+ });
+ while let Some((output_location, rb_stream)) = file_stream_rx.recv().await
{
+ let serializer = get_serializer();
+ let object_meta = ObjectMeta {
+ location: output_location,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = create_writer(
+ FileWriterMode::PutMultipart,
+ compression,
+ object_meta.into(),
+ object_store.clone(),
+ )
+ .await?;
+
+ tx_file_bundle
+ .send((rb_stream, serializer, writer))
+ .await
+ .map_err(|_| {
+ DataFusionError::Internal(
+ "Writer receive file bundle channel closed
unexpectedly!".into(),
+ )
+ })?;
+ }
+
+ // Signal to the write coordinater that no more files are coming
+ drop(tx_file_bundle);
+
+ let total_count = rx_row_cnt.await.map_err(|_| {
+ DataFusionError::Internal(
+ "Did not receieve row count from write coordinater".into(),
+ )
+ })?;
+
+ match try_join!(write_coordinater_task, demux_task) {
+ Ok((r1, r2)) => {
+ r1?;
+ r2?;
+ }
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
+ }
+ }
+
+ Ok(total_count)
+}
+
+/// Orchestrates append_all for any statelessly serialized file type. Appends
to all files provided
+/// in a round robin fashion.
+pub(crate) async fn stateless_append_all(
+ mut data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ object_store: Arc<dyn ObjectStore>,
+ file_groups: &Vec<PartitionedFile>,
+ unbounded_input: bool,
+ compression: FileCompressionType,
+ get_serializer: Box<dyn Fn(usize) -> Box<dyn BatchSerializer> + Send>,
+) -> Result<u64> {
+ let rb_buffer_size = &context
+ .session_config()
+ .options()
+ .execution
+ .max_buffered_batches_per_output_file;
+
+ let (tx_file_bundle, rx_file_bundle) =
tokio::sync::mpsc::channel(file_groups.len());
+ let mut send_channels = vec![];
+ for file_group in file_groups {
+ let serializer = get_serializer(file_group.object_meta.size);
+
+ let file = file_group.clone();
+ let writer = create_writer(
+ FileWriterMode::Append,
+ compression,
+ file.object_meta.clone().into(),
+ object_store.clone(),
+ )
+ .await?;
+
+ let (tx, rx) = tokio::sync::mpsc::channel(rb_buffer_size / 2);
+ send_channels.push(tx);
+ tx_file_bundle
+ .send((rx, serializer, writer))
+ .await
+ .map_err(|_| {
+ DataFusionError::Internal(
+ "Writer receive file bundle channel closed
unexpectedly!".into(),
+ )
+ })?;
+ }
+
+ let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel();
+ let write_coordinater_task = tokio::spawn(async move {
+ stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt,
unbounded_input)
+ .await
+ });
+
+ // Append to file groups in round robin
+ let mut next_file_idx = 0;
+ while let Some(rb) = data.next().await.transpose()? {
+ send_channels[next_file_idx].send(rb).await.map_err(|_| {
+ DataFusionError::Internal(
+ "Recordbatch file append stream closed unexpectedly!".into(),
+ )
+ })?;
+ next_file_idx = (next_file_idx + 1) % send_channels.len();
+ if unbounded_input {
+ tokio::task::yield_now().await;
+ }
+ }
+ // Signal to the write coordinater that no more files are coming
+ drop(tx_file_bundle);
+ drop(send_channels);
+
+ let total_count = rx_row_cnt.await.map_err(|_| {
+ DataFusionError::Internal(
+ "Did not receieve row count from write coordinater".into(),
+ )
+ })?;
+
+ match try_join!(write_coordinater_task) {
+ Ok(r1) => {
+ r1.0?;
+ }
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
+ }
+ }
+
+ Ok(total_count)
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index ca86e3e3c7..05d8ba6c45 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1615,6 +1615,10 @@ mod tests {
async fn test_insert_into_append_new_json_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
+ config_map.insert(
+ "datafusion.execution.soft_max_rows_per_output_file".into(),
+ "1".into(),
+ );
helper_test_append_new_files_to_table(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
@@ -1639,6 +1643,10 @@ mod tests {
async fn test_insert_into_append_new_csv_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
+ config_map.insert(
+ "datafusion.execution.soft_max_rows_per_output_file".into(),
+ "1".into(),
+ );
helper_test_append_new_files_to_table(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
@@ -1652,6 +1660,10 @@ mod tests {
async fn test_insert_into_append_new_parquet_files_defaults() ->
Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
+ config_map.insert(
+ "datafusion.execution.soft_max_rows_per_output_file".into(),
+ "1".into(),
+ );
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
@@ -1782,6 +1794,11 @@ mod tests {
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() ->
Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
+ config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
+ config_map.insert(
+ "datafusion.execution.soft_max_rows_per_output_file".into(),
+ "1".into(),
+ );
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd(5)".into(),
@@ -2090,7 +2107,6 @@ mod tests {
}
None => SessionContext::new(),
};
- let target_partition_number =
session_ctx.state().config().target_partitions();
// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new(
@@ -2230,7 +2246,7 @@ mod tests {
// Assert that `target_partition_number` many files were added to the
table.
let num_files = tmp_dir.path().read_dir()?.count();
- assert_eq!(num_files, target_partition_number);
+ assert_eq!(num_files, 3);
// Create a physical plan from the insert plan
let plan = session_ctx
@@ -2273,7 +2289,7 @@ mod tests {
// Assert that another `target_partition_number` many files were added
to the table.
let num_files = tmp_dir.path().read_dir()?.count();
- assert_eq!(num_files, 2 * target_partition_number);
+ assert_eq!(num_files, 6);
// Return Ok if the function
Ok(())
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index ba99a2b695..a2f8e225e1 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -270,7 +270,7 @@ impl DataSink for MemSink {
async fn write_all(
&self,
- mut data: Vec<SendableRecordBatchStream>,
+ mut data: SendableRecordBatchStream,
_context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = self.batches.len();
@@ -280,14 +280,10 @@ impl DataSink for MemSink {
let mut new_batches = vec![vec![]; num_partitions];
let mut i = 0;
let mut row_count = 0;
- let num_parts = data.len();
- // TODO parallelize outer and inner loops
- for data_part in data.iter_mut().take(num_parts) {
- while let Some(batch) = data_part.next().await.transpose()? {
- row_count += batch.num_rows();
- new_batches[i].push(batch);
- i = (i + 1) % num_partitions;
- }
+ while let Some(batch) = data.next().await.transpose()? {
+ row_count += batch.num_rows();
+ new_batches[i].push(batch);
+ i = (i + 1) % num_partitions;
}
// write the outputs into the batches
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index c781ad81c1..e59686453f 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -1976,24 +1976,7 @@ mod tests {
ParquetReadOptions::default(),
)
.await?;
- ctx.register_parquet(
- "part1",
- &format!("{out_dir}/{write_id}_1.parquet"),
- ParquetReadOptions::default(),
- )
- .await?;
- ctx.register_parquet(
- "part2",
- &format!("{out_dir}/{write_id}_2.parquet"),
- ParquetReadOptions::default(),
- )
- .await?;
- ctx.register_parquet(
- "part3",
- &format!("{out_dir}/{write_id}_3.parquet"),
- ParquetReadOptions::default(),
- )
- .await?;
+
ctx.register_parquet("allparts", &out_dir,
ParquetReadOptions::default())
.await?;
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index 5b58a0a771..d1f2706930 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -36,7 +36,7 @@ use arrow_array::{ArrayRef, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::{exec_err, internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_expr::{Distribution, PhysicalSortRequirement};
use async_trait::async_trait;
use futures::StreamExt;
@@ -68,7 +68,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync {
/// or rollback required.
async fn write_all(
&self,
- data: Vec<SendableRecordBatchStream>,
+ data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64>;
}
@@ -152,18 +152,6 @@ impl FileSinkExec {
}
}
- fn execute_all_input_streams(
- &self,
- context: Arc<TaskContext>,
- ) -> Result<Vec<SendableRecordBatchStream>> {
- let n_input_parts = self.input.output_partitioning().partition_count();
- let mut streams = Vec::with_capacity(n_input_parts);
- for part in 0..n_input_parts {
- streams.push(self.execute_input_stream(part, context.clone())?);
- }
- Ok(streams)
- }
-
/// Returns insert sink
pub fn sink(&self) -> &dyn DataSink {
self.sink.as_ref()
@@ -210,13 +198,17 @@ impl ExecutionPlan for FileSinkExec {
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
- // Incoming number of partitions is taken to be the
- // number of files the query is required to write out.
- // The optimizer should not change this number.
- // Parrallelism is handled within the appropriate DataSink
+ // DataSink is responsible for dynamically partitioning its
+ // own input at execution time.
vec![false]
}
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ // DataSink is responsible for dynamically partitioning its
+ // own input at execution time, and so requires a single input
partition.
+ vec![Distribution::SinglePartition; self.children().len()]
+ }
+
fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
// The input order is either exlicitly set (such as by a ListingTable),
// or require that the [FileSinkExec] gets the data in the order the
@@ -269,7 +261,7 @@ impl ExecutionPlan for FileSinkExec {
if partition != 0 {
return internal_err!("FileSinkExec can only be called on partition
0!");
}
- let data = self.execute_all_input_streams(context.clone())?;
+ let data = self.execute_input_stream(0, context.clone())?;
let count_schema = self.count_schema.clone();
let sink = self.sink.clone();
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt
b/datafusion/sqllogictest/test_files/information_schema.slt
index d2ec21488d..8e22ad833f 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -150,6 +150,8 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
+datafusion.execution.max_buffered_batches_per_output_file 2
+datafusion.execution.max_parallel_ouput_files 8
datafusion.execution.meta_fetch_concurrency 32
datafusion.execution.parquet.allow_single_file_parallelism false
datafusion.execution.parquet.bloom_filter_enabled false
@@ -175,6 +177,7 @@ datafusion.execution.parquet.statistics_enabled NULL
datafusion.execution.parquet.write_batch_size 1024
datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
+datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
datafusion.execution.target_partitions 7
@@ -217,6 +220,8 @@ datafusion.execution.aggregate.scalar_update_factor 10
Specifies the threshold f
datafusion.execution.batch_size 8192 Default batch size while creating new
batches, it's especially useful for buffer-in-memory batches since creating
tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches
will be examined between each operator and small batches will be coalesced into
larger batches. This is helpful when there are highly selective filters or
joins that could produce tiny output batches. The target batch size is
determined by the configuration setting
datafusion.execution.collect_statistics false Should DataFusion collect
statistics after listing files
+datafusion.execution.max_buffered_batches_per_output_file 2 This is the
maximum number of RecordBatches buffered for each output file being worked.
Higher values can potentially give faster write performance at the cost of
higher peak memory consumption
+datafusion.execution.max_parallel_ouput_files 8 This is the maximum number of
output files being written in parallel. Higher values can potentially give
faster write performance at the cost of higher peak memory consumption.
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in
parallel when inferring schema and statistics
datafusion.execution.parquet.allow_single_file_parallelism false Controls
whether DataFusion will attempt to speed up writing large parquet files by
first writing multiple smaller files and then stitching them together into a
single large file. This will result in faster write speeds, but higher memory
usage. Also currently unsupported are bloom filters and column indexes when
single_file_parallelism is enabled.
datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter
is enabled for any column
@@ -242,6 +247,7 @@ datafusion.execution.parquet.statistics_enabled NULL Sets
if statistics are enab
datafusion.execution.parquet.write_batch_size 1024 Sets write_batch_size in
bytes
datafusion.execution.parquet.writer_version 1.0 Sets parquet writer version
valid values are "1.0" and "2.0"
datafusion.execution.planning_concurrency 13 Fan-out during initial physical
planning. This is mostly use to plan `UNION` children in parallel. Defaults to
the number of CPU cores on the system
+datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of
rows in output files when writing multiple. This is a soft max, so it can be
exceeded slightly. There also will be one file smaller than the limit if the
total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below
what size should data be concatenated and sorted in a single RecordBatch rather
than sorted in batches and merged.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the
reserved memory for each spillable sort operation to facilitate an in-memory
merge. When a sort operation spills to disk, the in-memory data must be sorted
and merged before being written to a file. This setting reserves a specific
amount of memory for that in-memory sort/merge process. Note: This setting is
irrelevant if the sort operation cannot spill (i.e., if there's no
`DiskManager` configured).
datafusion.execution.target_partitions 7 Number of partitions for query
execution. Increasing partitions can increase concurrency. Defaults to the
number of CPU cores on the system
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index 74968bb089..cc04c62277 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -126,13 +126,14 @@ Dml: op=[Insert Into] table=[table_without_values]
------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: sink=MemoryTable (partitions=1)
---ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: [...]
-------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
---------CoalesceBatchesExec: target_batch_size=8192
-----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
-------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
---------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c4, c9], has_header=true
+--CoalescePartitionsExec
+----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound [...]
+--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
+--------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c4, c9], has_header=true
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index d1b73204e3..abbfa304be 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -102,11 +102,18 @@ INSERT INTO single_file_test values (1, 2), (3, 4);
----
2
+query II
+INSERT INTO single_file_test values (4, 5), (6, 7);
+----
+2
+
query II
select * from single_file_test;
----
1 2
3 4
+4 5
+6 7
statement ok
CREATE EXTERNAL TABLE
@@ -215,13 +222,14 @@ Dml: op=[Insert Into] table=[table_without_values]
------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
---ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: [...]
-------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
---------CoalesceBatchesExec: target_batch_size=8192
-----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
-------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
---------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c4, c9], has_header=true
+--CoalescePartitionsExec
+----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound [...]
+--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
+--------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c4, c9], has_header=true
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index cab1e5c3e4..a0451eed08 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -77,6 +77,9 @@ Environment variables are read during `SessionConfig`
initialisation so they mus
| datafusion.execution.sort_spill_reservation_bytes | 10485760
| Specifies the reserved memory for each spillable sort operation to
facilitate an in-memory merge. When a sort operation spills to disk, the
in-memory data must be sorted and merged before being written to a file. This
setting reserves a specific amount of memory for that in-memory sort/merge
process. Note: This setting is irrelevant if the sort operation cannot spill
(i.e., if there's no `DiskManag [...]
| datafusion.execution.sort_in_place_threshold_bytes | 1048576
| When sorting, below what size should data be concatenated and
sorted in a single RecordBatch rather than sorted in batches and merged.
[...]
| datafusion.execution.meta_fetch_concurrency | 32
| Number of files to read in parallel when inferring schema and
statistics
[...]
+| datafusion.execution.soft_max_rows_per_output_file | 50000000
| Target number of rows in output files when writing multiple. This
is a soft max, so it can be exceeded slightly. There also will be one file
smaller than the limit if the total number of rows written is not roughly
divisible by the soft max
[...]
+| datafusion.execution.max_parallel_ouput_files | 8
| This is the maximum number of output files being written in
parallel. Higher values can potentially give faster write performance at the
cost of higher peak memory consumption.
[...]
+| datafusion.execution.max_buffered_batches_per_output_file | 2
| This is the maximum number of RecordBatches buffered for each
output file being worked. Higher values can potentially give faster write
performance at the cost of higher peak memory consumption
[...]
| datafusion.optimizer.enable_round_robin_repartition | true
| When set to true, the physical plan optimizer will try to add round
robin repartitioning to increase parallelism to leverage more CPU cores
[...]
| datafusion.optimizer.enable_topk_aggregation | true
| When set to true, the optimizer will attempt to perform limit
operations during aggregations, if possible
[...]
| datafusion.optimizer.filter_null_join_keys | false
| When set to true, the optimizer will insert filters before a join
between a nullable and non-nullable column to filter out nulls on the nullable
side. This filter can add additional overhead when the file format does not
fully support predicate push down.
[...]