This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ed85abbb87 Extend insert into to support Parquet backed tables (#7244)
ed85abbb87 is described below
commit ed85abbb878ef3d60e43797376cb9a40955cd89a
Author: Devin D'Angelo <[email protected]>
AuthorDate: Sun Aug 13 07:12:02 2023 -0400
Extend insert into to support Parquet backed tables (#7244)
* initial AsyncArrowWriter immplementation
* clean up
* fix information_schema test
* refactor code to new write.rs mod
* remove config dependency on parquet crate
* fmt
* finish implementing session write configs and parsing
* fix ndv doc
* rebase resolve conflicts
* split up test_string_expressions into 2 tests to avoid stack overflow
error
* add comments explaining test split
---
datafusion/common/src/config.rs | 64 +++-
datafusion/core/src/datasource/file_format/csv.rs | 7 +-
datafusion/core/src/datasource/file_format/json.rs | 7 +-
datafusion/core/src/datasource/file_format/mod.rs | 344 +-------------------
.../core/src/datasource/file_format/options.rs | 37 ++-
.../core/src/datasource/file_format/parquet.rs | 352 ++++++++++++++++++++-
.../datasource/file_format/{mod.rs => write.rs} | 332 +++----------------
datafusion/core/src/datasource/listing/table.rs | 175 ++++++++--
.../src/datasource/physical_plan/file_stream.rs | 2 +-
.../core/src/datasource/physical_plan/mod.rs | 2 +-
datafusion/core/src/execution/context.rs | 10 +-
datafusion/core/tests/sql/expr.rs | 11 +-
.../test_files/information_schema.slt | 16 +
docs/source/user-guide/configs.md | 104 +++---
14 files changed, 746 insertions(+), 717 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index fe7fb95503..41c2657e1c 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -255,7 +255,7 @@ config_namespace! {
}
config_namespace! {
- /// Options related to reading of parquet files
+ /// Options related to parquet files
pub struct ParquetOptions {
/// If true, reads the Parquet data page level metadata (the
/// Page Index), if present, to reduce the I/O and number of
@@ -286,6 +286,66 @@ config_namespace! {
/// will be reordered heuristically to minimize the cost of
evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false
+
+ // The following map to parquet::file::properties::WriterProperties
+
+ /// Sets best effort maximum size of data page in bytes
+ pub data_pagesize_limit: usize, default = 1024 * 1024
+
+ /// Sets write_batch_size in bytes
+ pub write_batch_size: usize, default = 1024
+
+ /// Sets parquet writer version
+ /// valid values are "1.0" and "2.0"
+ pub writer_version: String, default = "1.0".into()
+
+ /// Sets default parquet compression codec
+ /// Valid values are: uncompressed, snappy, gzip(level),
+ /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
+ /// These values are not case sensitive.
+ pub compression: String, default = "snappy".into()
+
+ /// Sets if dictionary encoding is enabled
+ pub dictionary_enabled: bool, default = true
+
+ /// Sets best effort maximum dictionary page size, in bytes
+ pub dictionary_page_size_limit: usize, default = 1024 * 1024
+
+ /// Sets if statistics are enabled for any column
+ /// Valid values are: "none", "chunk", and "page"
+ /// These values are not case sensitive.
+ pub statistics_enabled: String, default = "page".into()
+
+ /// Sets max statistics size for any column
+ pub max_statistics_size: usize, default = 4096
+
+ /// Sets maximum number of rows in a row group
+ pub max_row_group_size: usize, default = 1024 * 1024
+
+ /// Sets "created by" property
+ pub created_by: String, default = concat!("datafusion version ",
env!("CARGO_PKG_VERSION")).into()
+
+ /// Sets column index trucate length
+ pub column_index_truncate_length: Option<usize>, default = None
+
+ /// Sets best effort maximum number of rows in data page
+ pub data_page_row_count_limit: usize, default = usize::MAX
+
+ /// Sets default encoding for any column
+ /// Valid values are: plain, plain_dictionary, rle,
+ /// bit_packed, delta_binary_packed, delta_length_byte_array,
+ /// delta_byte_array, rle_dictionary, and byte_stream_split.
+ /// These values are not case sensitive.
+ pub encoding: String, default = "plain".into()
+
+ /// Sets if bloom filter is enabled for any column
+ pub bloom_filter_enabled: bool, default = false
+
+ /// Sets bloom filter false positive probability
+ pub bloom_filter_fpp: f64, default = 0.05
+
+ /// Sets bloom filter number of distinct values
+ pub bloom_filter_ndv: u64, default = 1_000_000_u64
}
}
@@ -745,6 +805,8 @@ macro_rules! config_field {
config_field!(String);
config_field!(bool);
config_field!(usize);
+config_field!(f64);
+config_field!(u64);
/// An implementation trait used to recursively walk configuration
trait Visit {
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 8f56bf139e..32fbf03b58 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -37,10 +37,11 @@ use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
-use super::{create_writer, stateless_serialize_and_write_files, FileFormat};
+use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_type::FileCompressionType;
-use crate::datasource::file_format::FileWriterMode;
-use crate::datasource::file_format::{BatchSerializer,
DEFAULT_SCHEMA_INFER_MAX_RECORD};
+use crate::datasource::file_format::write::{
+ create_writer, stateless_serialize_and_write_files, BatchSerializer,
FileWriterMode,
+};
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 6856ad89ea..8472f4e5c1 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -47,13 +47,12 @@ use crate::physical_plan::insert::InsertExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
-use super::create_writer;
-use super::stateless_serialize_and_write_files;
-use super::BatchSerializer;
use super::FileFormat;
use super::FileScanConfig;
-use super::FileWriterMode;
use crate::datasource::file_format::file_type::FileCompressionType;
+use crate::datasource::file_format::write::{
+ create_writer, stateless_serialize_and_write_files, BatchSerializer,
FileWriterMode,
+};
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::physical_plan::NdJsonExec;
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 42b16656fb..9eec11f224 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -16,6 +16,7 @@
// under the License.
//! Module containing helper methods for the various file formats
+//! See write.rs for write related helper methods
/// Default max records to scan to infer the schema
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
@@ -27,36 +28,24 @@ pub mod file_type;
pub mod json;
pub mod options;
pub mod parquet;
+pub mod write;
use std::any::Any;
-use std::io::Error;
-use std::pin::Pin;
+use std::fmt;
use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::{fmt, mem};
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream,
Statistics};
+use crate::physical_plan::{ExecutionPlan, Statistics};
-use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;
use datafusion_physical_expr::PhysicalExpr;
use async_trait::async_trait;
-use bytes::Bytes;
-use futures::future::BoxFuture;
-use futures::FutureExt;
-use futures::{ready, StreamExt};
-use object_store::path::Path;
-use object_store::{MultipartId, ObjectMeta, ObjectStore};
-use tokio::io::{AsyncWrite, AsyncWriteExt};
+use object_store::{ObjectMeta, ObjectStore};
-use self::file_type::FileCompressionType;
-
-use super::physical_plan::FileMeta;
/// This trait abstracts all the file format specific implementations
/// from the [`TableProvider`]. This helps code re-utilization across
/// providers that support the the same file formats.
@@ -116,329 +105,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
}
}
-/// `AsyncPutWriter` is an object that facilitates asynchronous writing to
object stores.
-/// It is specifically designed for the `object_store` crate's `put` method
and sends
-/// whole bytes at once when the buffer is flushed.
-pub struct AsyncPutWriter {
- /// Object metadata
- object_meta: ObjectMeta,
- /// A shared reference to the object store
- store: Arc<dyn ObjectStore>,
- /// A buffer that stores the bytes to be sent
- current_buffer: Vec<u8>,
- /// Used for async handling in flush method
- inner_state: AsyncPutState,
-}
-
-impl AsyncPutWriter {
- /// Constructor for the `AsyncPutWriter` object
- pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
- Self {
- object_meta,
- store,
- current_buffer: vec![],
- // The writer starts out in buffering mode
- inner_state: AsyncPutState::Buffer,
- }
- }
-
- /// Separate implementation function that unpins the [`AsyncPutWriter`] so
- /// that partial borrows work correctly
- fn poll_shutdown_inner(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- loop {
- match &mut self.inner_state {
- AsyncPutState::Buffer => {
- // Convert the current buffer to bytes and take ownership
of it
- let bytes = Bytes::from(mem::take(&mut
self.current_buffer));
- // Set the inner state to Put variant with the bytes
- self.inner_state = AsyncPutState::Put { bytes }
- }
- AsyncPutState::Put { bytes } => {
- // Send the bytes to the object store's put method
- return Poll::Ready(
- ready!(self
- .store
- .put(&self.object_meta.location, bytes.clone())
- .poll_unpin(cx))
- .map_err(Error::from),
- );
- }
- }
- }
- }
-}
-
-/// An enum that represents the inner state of AsyncPut
-enum AsyncPutState {
- /// Building Bytes struct in this state
- Buffer,
- /// Data in the buffer is being sent to the object store
- Put { bytes: Bytes },
-}
-
-impl AsyncWrite for AsyncPutWriter {
- // Define the implementation of the AsyncWrite trait for the
`AsyncPutWriter` struct
- fn poll_write(
- mut self: Pin<&mut Self>,
- _: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<std::result::Result<usize, Error>> {
- // Extend the current buffer with the incoming buffer
- self.current_buffer.extend_from_slice(buf);
- // Return a ready poll with the length of the incoming buffer
- Poll::Ready(Ok(buf.len()))
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- // Return a ready poll with an empty result
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- // Call the poll_shutdown_inner method to handle the actual sending of
data to the object store
- self.poll_shutdown_inner(cx)
- }
-}
-
-/// Stores data needed during abortion of MultiPart writers
-pub(crate) struct MultiPart {
- /// A shared reference to the object store
- store: Arc<dyn ObjectStore>,
- multipart_id: MultipartId,
- location: Path,
-}
-
-impl MultiPart {
- /// Create a new `MultiPart`
- pub fn new(
- store: Arc<dyn ObjectStore>,
- multipart_id: MultipartId,
- location: Path,
- ) -> Self {
- Self {
- store,
- multipart_id,
- location,
- }
- }
-}
-
-pub(crate) enum AbortMode {
- Put,
- Append,
- MultiPart(MultiPart),
-}
-
-/// A wrapper struct with abort method and writer
-pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
- writer: W,
- mode: AbortMode,
-}
-
-impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
- /// Create a new `AbortableWrite` instance with the given writer, and
write mode.
- fn new(writer: W, mode: AbortMode) -> Self {
- Self { writer, mode }
- }
-
- /// handling of abort for different write modes
- fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
- match &self.mode {
- AbortMode::Put => Ok(async { Ok(()) }.boxed()),
- AbortMode::Append => Err(DataFusionError::Execution(
- "Cannot abort in append mode".to_string(),
- )),
- AbortMode::MultiPart(MultiPart {
- store,
- multipart_id,
- location,
- }) => {
- let location = location.clone();
- let multipart_id = multipart_id.clone();
- let store = store.clone();
- Ok(Box::pin(async move {
- store
- .abort_multipart(&location, &multipart_id)
- .await
- .map_err(DataFusionError::ObjectStore)
- }))
- }
- }
- }
-}
-
-impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<std::result::Result<usize, Error>> {
- Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- Pin::new(&mut self.get_mut().writer).poll_flush(cx)
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
- }
-}
-
-/// An enum that defines different file writer modes.
-#[derive(Debug, Clone, Copy)]
-pub enum FileWriterMode {
- /// Data is appended to an existing file.
- Append,
- /// Data is written to a new file.
- Put,
- /// Data is written to a new file in multiple parts.
- PutMultipart,
-}
-
-/// Returns an [`AbortableWrite`] which writes to the given object store
location
-/// with the specified compression
-pub(crate) async fn create_writer(
- writer_mode: FileWriterMode,
- file_compression_type: FileCompressionType,
- file_meta: FileMeta,
- object_store: Arc<dyn ObjectStore>,
-) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
- let object = &file_meta.object_meta;
- match writer_mode {
- // If the mode is append, call the store's append method and return
wrapped in
- // a boxed trait object.
- FileWriterMode::Append => {
- let writer = object_store
- .append(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- let writer = AbortableWrite::new(
- file_compression_type.convert_async_writer(writer)?,
- AbortMode::Append,
- );
- Ok(writer)
- }
- // If the mode is put, create a new AsyncPut writer and return it
wrapped in
- // a boxed trait object
- FileWriterMode::Put => {
- let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
- let writer = AbortableWrite::new(
- file_compression_type.convert_async_writer(writer)?,
- AbortMode::Put,
- );
- Ok(writer)
- }
- // If the mode is put multipart, call the store's put_multipart method
and
- // return the writer wrapped in a boxed trait object.
- FileWriterMode::PutMultipart => {
- let (multipart_id, writer) = object_store
- .put_multipart(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- Ok(AbortableWrite::new(
- file_compression_type.convert_async_writer(writer)?,
- AbortMode::MultiPart(MultiPart::new(
- object_store,
- multipart_id,
- object.location.clone(),
- )),
- ))
- }
- }
-}
-
-/// A trait that defines the methods required for a RecordBatch serializer.
-#[async_trait]
-pub trait BatchSerializer: Unpin + Send {
- /// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
- async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
-}
-
-/// Checks if any of the passed writers have encountered an error
-/// and if so, all writers are aborted.
-async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
- result: Result<T>,
- writers: &mut [AbortableWrite<W>],
-) -> Result<T> {
- match result {
- Ok(value) => Ok(value),
- Err(e) => {
- // Abort all writers before returning the error:
- for writer in writers {
- let mut abort_future = writer.abort_writer();
- if let Ok(abort_future) = &mut abort_future {
- let _ = abort_future.await;
- }
- // Ignore errors that occur during abortion,
- // We do try to abort all writers before returning error.
- }
- // After aborting writers return original error.
- Err(e)
- }
- }
-}
-
-/// Contains the common logic for serializing RecordBatches and
-/// writing the resulting bytes to an ObjectStore.
-/// Serialization is assumed to be stateless, i.e.
-/// each RecordBatch can be serialized without any
-/// dependency on the RecordBatches before or after.
-async fn stateless_serialize_and_write_files(
- mut data: Vec<SendableRecordBatchStream>,
- mut serializers: Vec<Box<dyn BatchSerializer>>,
- mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
-) -> Result<u64> {
- let num_partitions = data.len();
- let mut row_count = 0;
- // Map errors to DatafusionError.
- let err_converter =
- |_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
- // TODO parallelize serialization accross partitions and batches within
partitions
- // see: https://github.com/apache/arrow-datafusion/issues/7079
- for idx in 0..num_partitions {
- while let Some(maybe_batch) = data[idx].next().await {
- // Write data to files in a round robin fashion:
- let serializer = &mut serializers[idx];
- let batch = check_for_errors(maybe_batch, &mut writers).await?;
- row_count += batch.num_rows();
- let bytes =
- check_for_errors(serializer.serialize(batch).await, &mut
writers).await?;
- let writer = &mut writers[idx];
- check_for_errors(
- writer.write_all(&bytes).await.map_err(err_converter),
- &mut writers,
- )
- .await?;
- }
- }
- // Perform cleanup:
- let n_writers = writers.len();
- for idx in 0..n_writers {
- check_for_errors(
- writers[idx].shutdown().await.map_err(err_converter),
- &mut writers,
- )
- .await?;
- }
- Ok(row_count as u64)
-}
-
#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index 73c20d3b0c..d8168070eb 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -214,6 +214,13 @@ pub struct ParquetReadOptions<'a> {
///
/// If None specified, uses value in SessionConfig
pub skip_metadata: Option<bool>,
+ /// An optional schema representing the parquet files. If None, parquet
reader will try to infer it
+ /// based on data in file.
+ pub schema: Option<&'a Schema>,
+ /// Indicates how the file is sorted
+ pub file_sort_order: Vec<Vec<Expr>>,
+ /// Setting controls how inserts to this file should be handled
+ pub insert_mode: ListingTableInsertMode,
}
impl<'a> Default for ParquetReadOptions<'a> {
@@ -223,6 +230,9 @@ impl<'a> Default for ParquetReadOptions<'a> {
table_partition_cols: vec![],
parquet_pruning: None,
skip_metadata: None,
+ schema: None,
+ file_sort_order: vec![],
+ insert_mode: ListingTableInsertMode::AppendNewFiles,
}
}
}
@@ -242,6 +252,12 @@ impl<'a> ParquetReadOptions<'a> {
self
}
+ /// Specify schema to use for parquet read
+ pub fn schema(mut self, schema: &'a Schema) -> Self {
+ self.schema = Some(schema);
+ self
+ }
+
/// Specify table_partition_cols for partition pruning
pub fn table_partition_cols(
mut self,
@@ -250,6 +266,18 @@ impl<'a> ParquetReadOptions<'a> {
self.table_partition_cols = table_partition_cols;
self
}
+
+ /// Configure if file has known sort order
+ pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
+ self.file_sort_order = file_sort_order;
+ self
+ }
+
+ /// Configure how insertions to this table should be handled
+ pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
+ self.insert_mode = insert_mode;
+ self
+ }
}
/// Options that control the reading of ARROW files.
@@ -525,6 +553,8 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
+ .with_file_sort_order(self.file_sort_order.clone())
+ .with_insert_mode(self.insert_mode.clone())
}
async fn get_resolved_schema(
@@ -533,11 +563,8 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
- // with parquet we resolve the schema in all cases
- Ok(self
- .to_listing_options(config)
- .infer_schema(&state, &table_path)
- .await?)
+ self._get_resolved_schema(config, state, table_path, self.schema,
false)
+ .await
}
}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 6cda0fe68b..c35f59dd4a 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -17,23 +17,31 @@
//! Parquet format abstractions
+use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
+use rand::distributions::DistString;
use std::any::Any;
+use std::fmt;
+use std::fmt::Debug;
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
-use datafusion_common::DataFusionError;
+use datafusion_common::{plan_err, DataFusionError};
+use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
-use parquet::arrow::parquet_to_arrow_schema;
+use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
+use parquet::file::properties::{EnabledStatistics, WriterProperties,
WriterVersion};
use parquet::file::statistics::Statistics as ParquetStatistics;
+use rand::distributions::Alphanumeric;
+use super::write::FileWriterMode;
use super::FileFormat;
use super::FileScanConfig;
use crate::arrow::array::{
@@ -42,12 +50,18 @@ use crate::arrow::array::{
use crate::arrow::datatypes::DataType;
use crate::config::ConfigOptions;
-use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter};
+use crate::datasource::physical_plan::{
+ FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter,
+};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
+use crate::physical_plan::insert::{DataSink, InsertExec};
+use crate::physical_plan::{
+ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
+ Statistics,
+};
/// The default file extension of parquet files
pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
@@ -208,6 +222,24 @@ impl FileFormat for ParquetFormat {
self.metadata_size_hint(state.config_options()),
)))
}
+
+ async fn create_writer_physical_plan(
+ &self,
+ input: Arc<dyn ExecutionPlan>,
+ _state: &SessionState,
+ conf: FileSinkConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if conf.overwrite {
+ return Err(DataFusionError::NotImplemented(
+ "Overwrites are not implemented yet for Parquet".into(),
+ ));
+ }
+
+ let sink_schema = conf.output_schema().clone();
+ let sink = Arc::new(ParquetSink::new(conf));
+
+ Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+ }
}
fn summarize_min_max(
@@ -543,6 +575,318 @@ async fn fetch_statistics(
Ok(statistics)
}
+/// Implements [`DataSink`] for writing to a parquet file.
+struct ParquetSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+}
+
+impl Debug for ParquetSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetSink").finish()
+ }
+}
+
+impl DisplayAs for ParquetSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "ParquetSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+/// Parses datafusion.execution.parquet.encoding String to a
parquet::basic::Encoding
+fn parse_encoding_string(str_setting: &str) ->
Result<parquet::basic::Encoding> {
+ let str_setting_lower: &str = &str_setting.to_lowercase();
+ match str_setting_lower {
+ "plain" => Ok(parquet::basic::Encoding::PLAIN),
+ "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
+ "rle" => Ok(parquet::basic::Encoding::RLE),
+ "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
+ "delta_binary_packed" =>
Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
+ "delta_length_byte_array" => {
+ Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
+ }
+ "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
+ "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
+ "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
+ _ => Err(DataFusionError::Plan(format!(
+ "Unknown or unsupported parquet encoding: \
+ {str_setting}. Valid values are: plain, plain_dictionary, rle, \
+ /// bit_packed, delta_binary_packed, delta_length_byte_array, \
+ /// delta_byte_array, rle_dictionary, and byte_stream_split."
+ ))),
+ }
+}
+
+/// Splits compression string into compression codec and optional
compression_level
+/// I.e. gzip(2) -> gzip, 2
+fn split_compression_string(str_setting: &str) -> Result<(&str, Option<u32>)> {
+ let split_setting = str_setting.split_once('(');
+
+ match split_setting {
+ Some((codec, rh)) => {
+ let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
+ DataFusionError::Plan(format!(
+ "Could not parse compression string. \
+ Got codec: {} and unknown level from {}",
+ codec, str_setting
+ ))
+ })?;
+ Ok((codec, Some(*level)))
+ }
+ None => Ok((str_setting, None)),
+ }
+}
+
+/// Helper to ensure compression codecs which don't support levels
+/// don't have one set. E.g. snappy(2) is invalid.
+fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
+ if level.is_some() {
+ return Err(DataFusionError::Plan(format!(
+ "Compression {codec} does not support specifying a level"
+ )));
+ }
+ Ok(())
+}
+
+/// Helper to ensure compression codecs which require a level
+/// do have one set. E.g. zstd is invalid, zstd(3) is valid
+fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
+ level.ok_or(DataFusionError::Plan(format!(
+ "{codec} compression requires specifying a level such as {codec}(4)"
+ )))
+}
+
+/// Parses datafusion.execution.parquet.compression String to a
parquet::basic::Compression
+fn parse_compression_string(str_setting: &str) ->
Result<parquet::basic::Compression> {
+ let str_setting_lower: &str = &str_setting.to_lowercase();
+ let (codec, level) = split_compression_string(str_setting_lower)?;
+ match codec {
+ "uncompressed" => {
+ check_level_is_none(codec, &level)?;
+ Ok(parquet::basic::Compression::UNCOMPRESSED)
+ }
+ "snappy" => {
+ check_level_is_none(codec, &level)?;
+ Ok(parquet::basic::Compression::SNAPPY)
+ }
+ "gzip" => {
+ let level = require_level(codec, level)?;
+ Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
+ level,
+ )?))
+ }
+ "lzo" => {
+ check_level_is_none(codec, &level)?;
+ Ok(parquet::basic::Compression::LZO)
+ }
+ "brotli" => {
+ let level = require_level(codec, level)?;
+ Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
+ level,
+ )?))
+ }
+ "lz4" => {
+ check_level_is_none(codec, &level)?;
+ Ok(parquet::basic::Compression::LZ4)
+ }
+ "zstd" => {
+ let level = require_level(codec, level)?;
+ Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
+ level as i32,
+ )?))
+ }
+ "lz4_raw" => {
+ check_level_is_none(codec, &level)?;
+ Ok(parquet::basic::Compression::LZ4_RAW)
+ }
+ _ => Err(DataFusionError::Plan(format!(
+ "Unknown or unsupported parquet compression: \
+ {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
+ lzo, brotli(level), lz4, zstd(level), and lz4_raw."
+ ))),
+ }
+}
+
+fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
+ let str_setting_lower: &str = &str_setting.to_lowercase();
+ match str_setting_lower {
+ "1.0" => Ok(WriterVersion::PARQUET_1_0),
+ "2.0" => Ok(WriterVersion::PARQUET_2_0),
+ _ => Err(DataFusionError::Plan(format!(
+ "Unknown or unsupported parquet writer version {str_setting} \
+ valid options are '1.0' and '2.0'"
+ ))),
+ }
+}
+
+fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
+ let str_setting_lower: &str = &str_setting.to_lowercase();
+ match str_setting_lower {
+ "none" => Ok(EnabledStatistics::None),
+ "chunk" => Ok(EnabledStatistics::Chunk),
+ "page" => Ok(EnabledStatistics::Page),
+ _ => Err(DataFusionError::Plan(format!(
+ "Unknown or unsupported parquet statistics setting {str_setting} \
+ valid options are 'none', 'page', and 'chunk'"
+ ))),
+ }
+}
+
+impl ParquetSink {
+ fn new(config: FileSinkConfig) -> Self {
+ Self { config }
+ }
+
+ /// Builds a parquet WriterProperties struct, setting options as
appropriate from TaskContext options.
+ /// May return error if SessionContext contains invalid or unsupported
options
+ fn parquet_writer_props_from_context(
+ &self,
+ context: &Arc<TaskContext>,
+ ) -> Result<WriterProperties> {
+ let parquet_context =
&context.session_config().options().execution.parquet;
+ Ok(WriterProperties::builder()
+ .set_data_page_size_limit(parquet_context.data_pagesize_limit)
+ .set_write_batch_size(parquet_context.write_batch_size)
+
.set_writer_version(parse_version_string(&parquet_context.writer_version)?)
+
.set_compression(parse_compression_string(&parquet_context.compression)?)
+ .set_dictionary_enabled(parquet_context.dictionary_enabled)
+
.set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit)
+ .set_statistics_enabled(parse_statistics_string(
+ &parquet_context.statistics_enabled,
+ )?)
+ .set_max_statistics_size(parquet_context.max_statistics_size)
+ .set_max_row_group_size(parquet_context.max_row_group_size)
+ .set_created_by(parquet_context.created_by.clone())
+ .set_column_index_truncate_length(
+ parquet_context.column_index_truncate_length,
+ )
+
.set_data_page_row_count_limit(parquet_context.data_page_row_count_limit)
+ .set_encoding(parse_encoding_string(&parquet_context.encoding)?)
+ .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled)
+ .set_bloom_filter_fpp(parquet_context.bloom_filter_fpp)
+ .set_bloom_filter_ndv(parquet_context.bloom_filter_ndv)
+ .build())
+ }
+
+ // Create a write for parquet files
+ async fn create_writer(
+ &self,
+ file_meta: FileMeta,
+ object_store: Arc<dyn ObjectStore>,
+ parquet_props: WriterProperties,
+ ) -> Result<
+ AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send +
Unpin>>,
+ > {
+ let object = &file_meta.object_meta;
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ plan_err!(
+ "Appending to Parquet files is not supported by the file
format!"
+ )
+ }
+ FileWriterMode::Put => Err(DataFusionError::NotImplemented(
+ "FileWriterMode::Put is not implemented for
ParquetSink".into(),
+ )),
+ FileWriterMode::PutMultipart => {
+ let (_, multipart_writer) = object_store
+ .put_multipart(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AsyncArrowWriter::try_new(
+ multipart_writer,
+ self.config.output_schema.clone(),
+ 10485760,
+ Some(parquet_props),
+ )?;
+ Ok(writer)
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for ParquetSink {
+ async fn write_all(
+ &self,
+ mut data: Vec<SendableRecordBatchStream>,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = data.len();
+ let parquet_props = self.parquet_writer_props_from_context(context)?;
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+
+ // Construct writer for each file group
+ let mut writers = vec![];
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ return plan_err!(
+ "Parquet format does not support appending to existing
file!"
+ )
+ }
+ FileWriterMode::Put => {
+ return Err(DataFusionError::NotImplemented(
+ "Put Mode is not implemented for ParquetSink yet".into(),
+ ))
+ }
+ FileWriterMode::PutMultipart => {
+ // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
+ let base_path = &self.config.table_paths[0];
+ // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
+ let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let file_path = base_path
+ .prefix()
+ .child(format!("/{}_{}.parquet", write_id, part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = self
+ .create_writer(
+ object_meta.into(),
+ object_store.clone(),
+ parquet_props.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ }
+
+ let mut row_count = 0;
+ // TODO parallelize serialization accross partitions and batches
within partitions
+ // see: https://github.com/apache/arrow-datafusion/issues/7079
+ for idx in 0..num_partitions {
+ while let Some(batch) = data[idx].next().await.transpose()? {
+ row_count += batch.num_rows();
+ // TODO cleanup all multipart writes when any encounters an
error
+ writers[idx].write(&batch).await?;
+ }
+ }
+
+ for writer in writers {
+ writer.close().await?;
+ }
+
+ Ok(row_count as u64)
+ }
+}
+
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/write.rs
similarity index 58%
copy from datafusion/core/src/datasource/file_format/mod.rs
copy to datafusion/core/src/datasource/file_format/write.rs
index 42b16656fb..c256c9689a 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -15,35 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-//! Module containing helper methods for the various file formats
+//! Module containing helper methods/traits related to enabling
+//! write support for the various file formats
-/// Default max records to scan to infer the schema
-pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
-
-pub mod arrow;
-pub mod avro;
-pub mod csv;
-pub mod file_type;
-pub mod json;
-pub mod options;
-pub mod parquet;
-
-use std::any::Any;
use std::io::Error;
+use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use std::{fmt, mem};
-use crate::arrow::datatypes::SchemaRef;
-use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
+use crate::datasource::physical_plan::FileMeta;
use crate::error::Result;
-use crate::execution::context::SessionState;
-use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream,
Statistics};
+use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;
-use datafusion_physical_expr::PhysicalExpr;
use async_trait::async_trait;
use bytes::Bytes;
@@ -54,67 +40,7 @@ use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};
-use self::file_type::FileCompressionType;
-
-use super::physical_plan::FileMeta;
-/// This trait abstracts all the file format specific implementations
-/// from the [`TableProvider`]. This helps code re-utilization across
-/// providers that support the the same file formats.
-///
-/// [`TableProvider`]: crate::datasource::provider::TableProvider
-#[async_trait]
-pub trait FileFormat: Send + Sync + fmt::Debug {
- /// Returns the table provider as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-
- /// Infer the common schema of the provided objects. The objects will
usually
- /// be analysed up to a given number of records or files (as specified in
the
- /// format config) then give the estimated common schema. This might fail
if
- /// the files have schemas that cannot be merged.
- async fn infer_schema(
- &self,
- state: &SessionState,
- store: &Arc<dyn ObjectStore>,
- objects: &[ObjectMeta],
- ) -> Result<SchemaRef>;
-
- /// Infer the statistics for the provided object. The cost and accuracy of
the
- /// estimated statistics might vary greatly between file formats.
- ///
- /// `table_schema` is the (combined) schema of the overall table
- /// and may be a superset of the schema contained in this file.
- ///
- /// TODO: should the file source return statistics for only columns
referred to in the table schema?
- async fn infer_stats(
- &self,
- state: &SessionState,
- store: &Arc<dyn ObjectStore>,
- table_schema: SchemaRef,
- object: &ObjectMeta,
- ) -> Result<Statistics>;
-
- /// Take a list of files and convert it to the appropriate executor
- /// according to this file format.
- async fn create_physical_plan(
- &self,
- state: &SessionState,
- conf: FileScanConfig,
- filters: Option<&Arc<dyn PhysicalExpr>>,
- ) -> Result<Arc<dyn ExecutionPlan>>;
-
- /// Take a list of files and the configuration to convert it to the
- /// appropriate writer executor according to this file format.
- async fn create_writer_physical_plan(
- &self,
- _input: Arc<dyn ExecutionPlan>,
- _state: &SessionState,
- _conf: FileSinkConfig,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let msg = "Writer not implemented for this format".to_owned();
- Err(DataFusionError::NotImplemented(msg))
- }
-}
+use super::file_type::FileCompressionType;
/// `AsyncPutWriter` is an object that facilitates asynchronous writing to
object stores.
/// It is specifically designed for the `object_store` crate's `put` method
and sends
@@ -246,12 +172,12 @@ pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin +
Send> {
impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
/// Create a new `AbortableWrite` instance with the given writer, and
write mode.
- fn new(writer: W, mode: AbortMode) -> Self {
+ pub(crate) fn new(writer: W, mode: AbortMode) -> Self {
Self { writer, mode }
}
/// handling of abort for different write modes
- fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
+ pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static,
Result<()>>> {
match &self.mode {
AbortMode::Put => Ok(async { Ok(()) }.boxed()),
AbortMode::Append => Err(DataFusionError::Execution(
@@ -310,6 +236,36 @@ pub enum FileWriterMode {
/// Data is written to a new file in multiple parts.
PutMultipart,
}
+/// A trait that defines the methods required for a RecordBatch serializer.
+#[async_trait]
+pub trait BatchSerializer: Unpin + Send {
+ /// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
+ async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
+}
+
+/// Checks if any of the passed writers have encountered an error
+/// and if so, all writers are aborted.
+async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
+ result: Result<T>,
+ writers: &mut [AbortableWrite<W>],
+) -> Result<T> {
+ match result {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // Abort all writers before returning the error:
+ for writer in writers {
+ let mut abort_future = writer.abort_writer();
+ if let Ok(abort_future) = &mut abort_future {
+ let _ = abort_future.await;
+ }
+ // Ignore errors that occur during abortion,
+ // We do try to abort all writers before returning error.
+ }
+ // After aborting writers return original error.
+ Err(e)
+ }
+ }
+}
/// Returns an [`AbortableWrite`] which writes to the given object store
location
/// with the specified compression
@@ -363,43 +319,12 @@ pub(crate) async fn create_writer(
}
}
-/// A trait that defines the methods required for a RecordBatch serializer.
-#[async_trait]
-pub trait BatchSerializer: Unpin + Send {
- /// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
- async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
-}
-
-/// Checks if any of the passed writers have encountered an error
-/// and if so, all writers are aborted.
-async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
- result: Result<T>,
- writers: &mut [AbortableWrite<W>],
-) -> Result<T> {
- match result {
- Ok(value) => Ok(value),
- Err(e) => {
- // Abort all writers before returning the error:
- for writer in writers {
- let mut abort_future = writer.abort_writer();
- if let Ok(abort_future) = &mut abort_future {
- let _ = abort_future.await;
- }
- // Ignore errors that occur during abortion,
- // We do try to abort all writers before returning error.
- }
- // After aborting writers return original error.
- Err(e)
- }
- }
-}
-
/// Contains the common logic for serializing RecordBatches and
/// writing the resulting bytes to an ObjectStore.
/// Serialization is assumed to be stateless, i.e.
/// each RecordBatch can be serialized without any
/// dependency on the RecordBatches before or after.
-async fn stateless_serialize_and_write_files(
+pub(crate) async fn stateless_serialize_and_write_files(
mut data: Vec<SendableRecordBatchStream>,
mut serializers: Vec<Box<dyn BatchSerializer>>,
mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
@@ -438,182 +363,3 @@ async fn stateless_serialize_and_write_files(
}
Ok(row_count as u64)
}
-
-#[cfg(test)]
-pub(crate) mod test_util {
- use std::ops::Range;
- use std::sync::Mutex;
-
- use super::*;
- use crate::datasource::listing::PartitionedFile;
- use crate::datasource::object_store::ObjectStoreUrl;
- use crate::test::object_store::local_unpartitioned_file;
- use bytes::Bytes;
- use futures::stream::BoxStream;
- use futures::StreamExt;
- use object_store::local::LocalFileSystem;
- use object_store::path::Path;
- use object_store::{GetOptions, GetResult, ListResult, MultipartId};
- use tokio::io::AsyncWrite;
-
- pub async fn scan_format(
- state: &SessionState,
- format: &dyn FileFormat,
- store_root: &str,
- file_name: &str,
- projection: Option<Vec<usize>>,
- limit: Option<usize>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let store = Arc::new(LocalFileSystem::new()) as _;
- let meta =
local_unpartitioned_file(format!("{store_root}/{file_name}"));
-
- let file_schema = format.infer_schema(state, &store,
&[meta.clone()]).await?;
-
- let statistics = format
- .infer_stats(state, &store, file_schema.clone(), &meta)
- .await?;
-
- let file_groups = vec![vec![PartitionedFile {
- object_meta: meta,
- partition_values: vec![],
- range: None,
- extensions: None,
- }]];
-
- let exec = format
- .create_physical_plan(
- state,
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_schema,
- file_groups,
- statistics,
- projection,
- limit,
- table_partition_cols: vec![],
- output_ordering: vec![],
- infinite_source: false,
- },
- None,
- )
- .await?;
- Ok(exec)
- }
-
- /// Mock ObjectStore to provide an variable stream of bytes on get
- /// Able to keep track of how many iterations of the provided bytes were
repeated
- #[derive(Debug)]
- pub struct VariableStream {
- bytes_to_repeat: Bytes,
- max_iterations: usize,
- iterations_detected: Arc<Mutex<usize>>,
- }
-
- impl std::fmt::Display for VariableStream {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "VariableStream")
- }
- }
-
- #[async_trait]
- impl ObjectStore for VariableStream {
- async fn put(&self, _location: &Path, _bytes: Bytes) ->
object_store::Result<()> {
- unimplemented!()
- }
-
- async fn put_multipart(
- &self,
- _location: &Path,
- ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin +
Send>)>
- {
- unimplemented!()
- }
-
- async fn abort_multipart(
- &self,
- _location: &Path,
- _multipart_id: &MultipartId,
- ) -> object_store::Result<()> {
- unimplemented!()
- }
-
- async fn get(&self, _location: &Path) ->
object_store::Result<GetResult> {
- let bytes = self.bytes_to_repeat.clone();
- let arc = self.iterations_detected.clone();
- Ok(GetResult::Stream(
- futures::stream::repeat_with(move || {
- let arc_inner = arc.clone();
- *arc_inner.lock().unwrap() += 1;
- Ok(bytes.clone())
- })
- .take(self.max_iterations)
- .boxed(),
- ))
- }
-
- async fn get_opts(
- &self,
- _location: &Path,
- _opts: GetOptions,
- ) -> object_store::Result<GetResult> {
- unimplemented!()
- }
-
- async fn get_ranges(
- &self,
- _location: &Path,
- _ranges: &[Range<usize>],
- ) -> object_store::Result<Vec<Bytes>> {
- unimplemented!()
- }
-
- async fn head(&self, _location: &Path) ->
object_store::Result<ObjectMeta> {
- unimplemented!()
- }
-
- async fn delete(&self, _location: &Path) -> object_store::Result<()> {
- unimplemented!()
- }
-
- async fn list(
- &self,
- _prefix: Option<&Path>,
- ) -> object_store::Result<BoxStream<'_,
object_store::Result<ObjectMeta>>>
- {
- unimplemented!()
- }
-
- async fn list_with_delimiter(
- &self,
- _prefix: Option<&Path>,
- ) -> object_store::Result<ListResult> {
- unimplemented!()
- }
-
- async fn copy(&self, _from: &Path, _to: &Path) ->
object_store::Result<()> {
- unimplemented!()
- }
-
- async fn copy_if_not_exists(
- &self,
- _from: &Path,
- _to: &Path,
- ) -> object_store::Result<()> {
- unimplemented!()
- }
- }
-
- impl VariableStream {
- pub fn new(bytes_to_repeat: Bytes, max_iterations: usize) -> Self {
- Self {
- bytes_to_repeat,
- max_iterations,
- iterations_detected: Arc::new(Mutex::new(0)),
- }
- }
-
- pub fn get_iterations_detected(&self) -> usize {
- *self.iterations_detected.lock().unwrap()
- }
- }
-}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 8519628760..4f2387ad0d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -845,10 +845,12 @@ impl TableProvider for ListingTable {
file_groups.len()
)));
}
- writer_mode =
crate::datasource::file_format::FileWriterMode::Append;
+ writer_mode =
+
crate::datasource::file_format::write::FileWriterMode::Append;
}
ListingTableInsertMode::AppendNewFiles => {
- writer_mode =
crate::datasource::file_format::FileWriterMode::PutMultipart
+ writer_mode =
+
crate::datasource::file_format::write::FileWriterMode::PutMultipart
}
ListingTableInsertMode::Error => {
return plan_err!(
@@ -963,6 +965,7 @@ mod tests {
use datafusion_common::assert_contains;
use datafusion_expr::LogicalPlanBuilder;
use rstest::*;
+ use std::collections::HashMap;
use std::fs::File;
use tempfile::TempDir;
@@ -1554,6 +1557,7 @@ mod tests {
helper_test_insert_into_append_to_existing_files(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
+ None,
)
.await?;
Ok(())
@@ -1564,6 +1568,7 @@ mod tests {
helper_test_append_new_files_to_table(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
+ None,
)
.await?;
Ok(())
@@ -1574,6 +1579,7 @@ mod tests {
helper_test_insert_into_append_to_existing_files(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
+ None,
)
.await?;
Ok(())
@@ -1584,11 +1590,127 @@ mod tests {
helper_test_append_new_files_to_table(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
+ None,
)
.await?;
Ok(())
}
+ #[tokio::test]
+ async fn test_insert_into_append_new_parquet_files_defaults() ->
Result<()> {
+ helper_test_append_new_files_to_table(
+ FileType::PARQUET,
+ FileCompressionType::UNCOMPRESSED,
+ None,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_new_parquet_files_session_overrides() ->
Result<()> {
+ let mut config_map: HashMap<String, String> = HashMap::new();
+ config_map.insert(
+ "datafusion.execution.parquet.compression".into(),
+ "zstd(5)".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.dictionary_enabled".into(),
+ "false".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.dictionary_page_size_limit".into(),
+ "100".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.staistics_enabled".into(),
+ "none".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.max_statistics_size".into(),
+ "10".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.max_row_group_size".into(),
+ "5".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.created_by".into(),
+ "datafusion test".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.column_index_truncate_length".into(),
+ "50".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.data_page_row_count_limit".into(),
+ "50".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.encoding".into(),
+ "delta_binary_packed".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.bloom_filter_enabled".into(),
+ "true".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.bloom_filter_fpp".into(),
+ "0.01".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.bloom_filter_ndv".into(),
+ "1000".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.writer_version".into(),
+ "2.0".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.write_batch_size".into(),
+ "5".into(),
+ );
+ helper_test_append_new_files_to_table(
+ FileType::PARQUET,
+ FileCompressionType::UNCOMPRESSED,
+ Some(config_map),
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
+ ) -> Result<()> {
+ let mut config_map: HashMap<String, String> = HashMap::new();
+ config_map.insert(
+ "datafusion.execution.parquet.compression".into(),
+ "zstd".into(),
+ );
+ let e = helper_test_append_new_files_to_table(
+ FileType::PARQUET,
+ FileCompressionType::UNCOMPRESSED,
+ Some(config_map),
+ )
+ .await
+ .expect_err("Example should fail!");
+ assert_eq!("Error during planning: zstd compression requires
specifying a level such as zstd(4)", format!("{e}"));
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> {
+ let maybe_err = helper_test_insert_into_append_to_existing_files(
+ FileType::PARQUET,
+ FileCompressionType::UNCOMPRESSED,
+ None,
+ )
+ .await;
+ let _err =
+ maybe_err.expect_err("Appending to existing parquet file did not
fail!");
+ Ok(())
+ }
+
fn load_empty_schema_table(
schema: SchemaRef,
temp_path: &str,
@@ -1615,9 +1737,16 @@ mod tests {
async fn helper_test_insert_into_append_to_existing_files(
file_type: FileType,
file_compression_type: FileCompressionType,
+ session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
// Create the initial context, schema, and batch.
- let session_ctx = SessionContext::new();
+ let session_ctx = match session_config_map {
+ Some(cfg) => {
+ let config = SessionConfig::from_string_hash_map(cfg)?;
+ SessionContext::with_config(config)
+ }
+ None => SessionContext::new(),
+ };
// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new(
"column1",
@@ -1777,9 +1906,17 @@ mod tests {
async fn helper_test_append_new_files_to_table(
file_type: FileType,
file_compression_type: FileCompressionType,
+ session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
// Create the initial context, schema, and batch.
- let session_ctx = SessionContext::new();
+ let session_ctx = match session_config_map {
+ Some(cfg) => {
+ let config = SessionConfig::from_string_hash_map(cfg)?;
+ SessionContext::with_config(config)
+ }
+ None => SessionContext::new(),
+ };
+
// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new(
"column1",
@@ -1825,9 +1962,9 @@ mod tests {
.register_parquet(
"t",
tmp_dir.path().to_str().unwrap(),
- ParquetReadOptions::default(), // TODO implement
insert_mode for parquet
-
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
-
//.schema(schema.as_ref()),
+ ParquetReadOptions::default()
+
.insert_mode(ListingTableInsertMode::AppendNewFiles)
+ .schema(schema.as_ref()),
)
.await?;
}
@@ -1894,16 +2031,16 @@ mod tests {
// Read the records in the table
let batches = session_ctx
- .sql("select count(*) from t")
+ .sql("select count(*) as count from t")
.await?
.collect()
.await?;
let expected = vec![
- "+----------+",
- "| COUNT(*) |",
- "+----------+",
- "| 6 |",
- "+----------+",
+ "+-------+",
+ "| count |",
+ "+-------+",
+ "| 6 |",
+ "+-------+",
];
// Assert that the batches read from the file match the expected
result.
@@ -1935,18 +2072,18 @@ mod tests {
// Read the contents of the table
let batches = session_ctx
- .sql("select count(*) from t")
+ .sql("select count(*) AS count from t")
.await?
.collect()
.await?;
// Define the expected result after the second append.
let expected = vec![
- "+----------+",
- "| COUNT(*) |",
- "+----------+",
- "| 12 |",
- "+----------+",
+ "+-------+",
+ "| count |",
+ "+-------+",
+ "| 12 |",
+ "+-------+",
];
// Assert that the batches read from the file after the second append
match the expected result.
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 2c4437de0a..6ac073c34f 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -522,7 +522,7 @@ mod tests {
use datafusion_common::DataFusionError;
use super::*;
- use crate::datasource::file_format::BatchSerializer;
+ use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 3e607d121a..b0914b0816 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -46,7 +46,7 @@ pub use json::{JsonOpener, NdJsonExec};
use crate::physical_plan::ExecutionPlan;
use crate::{
- datasource::file_format::FileWriterMode,
+ datasource::file_format::write::FileWriterMode,
physical_plan::{DisplayAs, DisplayFormatType},
};
use crate::{
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index ce9165f027..6593e22e6c 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1024,8 +1024,14 @@ impl SessionContext {
) -> Result<()> {
let listing_options =
options.to_listing_options(&self.state.read().config);
- self.register_listing_table(name, table_path, listing_options, None,
None)
- .await?;
+ self.register_listing_table(
+ name,
+ table_path,
+ listing_options,
+ options.schema.map(|s| Arc::new(s.to_owned())),
+ None,
+ )
+ .await?;
Ok(())
}
diff --git a/datafusion/core/tests/sql/expr.rs
b/datafusion/core/tests/sql/expr.rs
index fc0a4e7c7e..36786bd079 100644
--- a/datafusion/core/tests/sql/expr.rs
+++ b/datafusion/core/tests/sql/expr.rs
@@ -448,8 +448,10 @@ async fn test_substring_expr() -> Result<()> {
Ok(())
}
+/// Test string expressions test split into two batches
+/// to prevent stack overflow error
#[tokio::test]
-async fn test_string_expressions() -> Result<()> {
+async fn test_string_expressions_batch1() -> Result<()> {
test_expression!("ascii('')", "0");
test_expression!("ascii('x')", "120");
test_expression!("ascii(NULL)", "NULL");
@@ -501,6 +503,13 @@ async fn test_string_expressions() -> Result<()> {
test_expression!("rtrim(' zzzytest ', NULL)", "NULL");
test_expression!("rtrim('testxxzx', 'xyz')", "test");
test_expression!("rtrim(NULL, 'xyz')", "NULL");
+ Ok(())
+}
+
+/// Test string expressions test split into two batches
+/// to prevent stack overflow error
+#[tokio::test]
+async fn test_string_expressions_batch2() -> Result<()> {
test_expression!("split_part('abc~@~def~@~ghi', '~@~', 2)", "def");
test_expression!("split_part('abc~@~def~@~ghi', '~@~', 20)", "");
test_expression!("split_part(NULL, '~@~', 20)", "NULL");
diff --git
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 162e208201..452a8709c5 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -146,12 +146,28 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
+datafusion.execution.parquet.bloom_filter_enabled false
+datafusion.execution.parquet.bloom_filter_fpp 0.05
+datafusion.execution.parquet.bloom_filter_ndv 1000000
+datafusion.execution.parquet.column_index_truncate_length NULL
+datafusion.execution.parquet.compression snappy
+datafusion.execution.parquet.created_by datafusion version 28.0.0
+datafusion.execution.parquet.data_page_row_count_limit 18446744073709551615
+datafusion.execution.parquet.data_pagesize_limit 1048576
+datafusion.execution.parquet.dictionary_enabled true
+datafusion.execution.parquet.dictionary_page_size_limit 1048576
datafusion.execution.parquet.enable_page_index true
+datafusion.execution.parquet.encoding plain
+datafusion.execution.parquet.max_row_group_size 1048576
+datafusion.execution.parquet.max_statistics_size 4096
datafusion.execution.parquet.metadata_size_hint NULL
datafusion.execution.parquet.pruning true
datafusion.execution.parquet.pushdown_filters false
datafusion.execution.parquet.reorder_filters false
datafusion.execution.parquet.skip_metadata true
+datafusion.execution.parquet.statistics_enabled page
+datafusion.execution.parquet.write_batch_size 1024
+datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 63c9c064bc..50d9cb7c8b 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -35,47 +35,63 @@ Values are parsed according to the [same rules used in
casts from Utf8](https://
If the value in the environment variable cannot be cast to the type of the
configuration option, the default value will be used instead and a warning
emitted.
Environment variables are read during `SessionConfig` initialisation so they
must be set beforehand and will not affect running sessions.
-| key | default |
description
[...]
-| ---------------------------------------------------------- | ---------- |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| datafusion.catalog.create_default_catalog_and_schema | true |
Whether the default catalog and schema should be created automatically.
[...]
-| datafusion.catalog.default_catalog | datafusion |
The default catalog name - this impacts what SQL queries use if not specified
[...]
-| datafusion.catalog.default_schema | public |
The default schema name - this impacts what SQL queries use if not specified
[...]
-| datafusion.catalog.information_schema | false |
Should DataFusion provide access to `information_schema` virtual tables for
displaying schema information
[...]
-| datafusion.catalog.location | NULL |
Location scanned to load tables for `default` schema
[...]
-| datafusion.catalog.format | NULL |
Type of `TableProvider` to use when loading `default` schema
[...]
-| datafusion.catalog.has_header | false | If
the file has a header
[...]
-| datafusion.execution.batch_size | 8192 |
Default batch size while creating new batches, it's especially useful for
buffer-in-memory batches since creating tiny batches would result in too much
metadata memory consumption
[...]
-| datafusion.execution.coalesce_batches | true |
When set to true, record batches will be examined between each operator and
small batches will be coalesced into larger batches. This is helpful when there
are highly selective filters or joins that could produce tiny output batches.
The target batch size is determined by the configuration setting
[...]
-| datafusion.execution.collect_statistics | false |
Should DataFusion collect statistics after listing files
[...]
-| datafusion.execution.target_partitions | 0 |
Number of partitions for query execution. Increasing partitions can increase
concurrency. Defaults to the number of CPU cores on the system
[...]
-| datafusion.execution.time_zone | +00:00 |
The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`,
shift the underlying datetime according to this time zone, and then extract the
hour
[...]
-| datafusion.execution.parquet.enable_page_index | true | If
true, reads the Parquet data page level metadata (the Page Index), if present,
to reduce the I/O and number of rows decoded.
[...]
-| datafusion.execution.parquet.pruning | true | If
true, the parquet reader attempts to skip entire row groups based on the
predicate in the query and the metadata (min/max values) stored in the parquet
file
[...]
-| datafusion.execution.parquet.skip_metadata | true | If
true, the parquet reader skip the optional embedded metadata that may be in the
file Schema. This setting can help avoid schema conflicts when querying
multiple parquet files with schemas containing compatible types but different
metadata
[...]
-| datafusion.execution.parquet.metadata_size_hint | NULL | If
specified, the parquet reader will try and fetch the last `size_hint` bytes of
the parquet file optimistically. If not specified, two reads are required: One
read to fetch the 8-byte parquet footer and another to fetch the metadata
length encoded in the footer
[...]
-| datafusion.execution.parquet.pushdown_filters | false | If
true, filter expressions are be applied during the parquet decoding operation
to reduce the number of rows decoded
[...]
-| datafusion.execution.parquet.reorder_filters | false | If
true, filter expressions evaluated during the parquet decoding operation will
be reordered heuristically to minimize the cost of evaluation. If false, the
filters are applied in the same order as written in the query
[...]
-| datafusion.execution.aggregate.scalar_update_factor | 10 |
Specifies the threshold for using `ScalarValue`s to update accumulators during
high-cardinality aggregations for each input batch. The aggregation is
considered high-cardinality if the number of affected groups is greater than or
equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are
utilized for updating accumulators, rather than the default batch-slice
approach. This can lead to perform [...]
-| datafusion.execution.planning_concurrency | 0 |
Fan-out during initial physical planning. This is mostly use to plan `UNION`
children in parallel. Defaults to the number of CPU cores on the system
[...]
-| datafusion.execution.sort_spill_reservation_bytes | 10485760 |
Specifies the reserved memory for each spillable sort operation to facilitate
an in-memory merge. When a sort operation spills to disk, the in-memory data
must be sorted and merged before being written to a file. This setting reserves
a specific amount of memory for that in-memory sort/merge process. Note: This
setting is irrelevant if the sort operation cannot spill (i.e., if there's no
`DiskManager` configured) [...]
-| datafusion.execution.sort_in_place_threshold_bytes | 1048576 |
When sorting, below what size should data be concatenated and sorted in a
single RecordBatch rather than sorted in batches and merged.
[...]
-| datafusion.optimizer.enable_round_robin_repartition | true |
When set to true, the physical plan optimizer will try to add round robin
repartitioning to increase parallelism to leverage more CPU cores
[...]
-| datafusion.optimizer.filter_null_join_keys | false |
When set to true, the optimizer will insert filters before a join between a
nullable and non-nullable column to filter out nulls on the nullable side. This
filter can add additional overhead when the file format does not fully support
predicate push down.
[...]
-| datafusion.optimizer.repartition_aggregations | true |
Should DataFusion repartition data using the aggregate keys to execute
aggregates in parallel using the provided `target_partitions` level
[...]
-| datafusion.optimizer.repartition_file_min_size | 10485760 |
Minimum total files size in bytes to perform file scan repartitioning.
[...]
-| datafusion.optimizer.repartition_joins | true |
Should DataFusion repartition data using the join keys to execute joins in
parallel using the provided `target_partitions` level
[...]
-| datafusion.optimizer.allow_symmetric_joins_without_pruning | true |
Should DataFusion allow symmetric hash joins for unbounded data sources even
when its inputs do not have any ordering or filtering If the flag is not
enabled, the SymmetricHashJoin operator will be unable to prune its internal
buffers, resulting in certain join types - such as Full, Left, LeftAnti,
LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of
the execution. This is not typical in [...]
-| datafusion.optimizer.repartition_file_scans | true |
When set to `true`, file groups will be repartitioned to achieve maximum
parallelism. Currently Parquet and CSV formats are supported. If set to `true`,
all files will be repartitioned evenly (i.e., a single large file might be
partitioned into smaller chunks) for parallel scanning. If set to `false`,
different files will be read in parallel, but repartitioning won't happen
within a single file. [...]
-| datafusion.optimizer.repartition_windows | true |
Should DataFusion repartition data using the partitions keys to execute window
functions in parallel using the provided `target_partitions` level
[...]
-| datafusion.optimizer.repartition_sorts | true |
Should DataFusion execute sorts in a per-partition fashion and merge afterwards
instead of coalescing first and sorting globally. With this flag is enabled,
plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", `
would turn into the plan below which performs better in multithreaded
environments `text "SortPreserving [...]
-| datafusion.optimizer.bounded_order_preserving_variants | false |
When true, DataFusion will opportunistically remove sorts by replacing
`RepartitionExec` with `SortPreservingRepartitionExec`, and
`CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is
bounded.
[...]
-| datafusion.optimizer.skip_failed_rules | false |
When set to true, the logical plan optimizer will produce warning messages if
any optimization rules produce errors and then proceed to the next rule. When
set to false, any rules that produce errors will cause the query to fail
[...]
-| datafusion.optimizer.max_passes | 3 |
Number of times that the optimizer will attempt to optimize the plan
[...]
-| datafusion.optimizer.top_down_join_key_reordering | true |
When set to true, the physical plan optimizer will run a top down process to
reorder the join keys
[...]
-| datafusion.optimizer.prefer_hash_join | true |
When set to true, the physical plan optimizer will prefer HashJoin over
SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but
consumes more memory
[...]
-| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 |
The maximum estimated size in bytes for one input side of a HashJoin will be
collected into a single partition
[...]
-| datafusion.explain.logical_plan_only | false |
When set to true, the explain statement will only print logical plans
[...]
-| datafusion.explain.physical_plan_only | false |
When set to true, the explain statement will only print physical plans
[...]
-| datafusion.sql_parser.parse_float_as_decimal | false |
When set to true, SQL parser will parse float as decimal type
[...]
-| datafusion.sql_parser.enable_ident_normalization | true |
When set to true, SQL parser will normalize ident (convert ident to lowercase
when not quoted)
[...]
-| datafusion.sql_parser.dialect | generic |
Configure the SQL dialect used by DataFusion's parser; supported values
include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL,
ClickHouse, BigQuery, and Ansi.
[...]
+| key | default
| description
[...]
+| ---------------------------------------------------------- |
------------------------- |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| datafusion.catalog.create_default_catalog_and_schema | true
| Whether the default catalog and schema should be created
automatically.
[...]
+| datafusion.catalog.default_catalog | datafusion
| The default catalog name - this impacts what SQL queries use if not
specified
[...]
+| datafusion.catalog.default_schema | public
| The default schema name - this impacts what SQL queries use if not
specified
[...]
+| datafusion.catalog.information_schema | false
| Should DataFusion provide access to `information_schema` virtual
tables for displaying schema information
[...]
+| datafusion.catalog.location | NULL
| Location scanned to load tables for `default` schema
[...]
+| datafusion.catalog.format | NULL
| Type of `TableProvider` to use when loading `default` schema
[...]
+| datafusion.catalog.has_header | false
| If the file has a header
[...]
+| datafusion.execution.batch_size | 8192
| Default batch size while creating new batches, it's especially
useful for buffer-in-memory batches since creating tiny batches would result in
too much metadata memory consumption
[...]
+| datafusion.execution.coalesce_batches | true
| When set to true, record batches will be examined between each
operator and small batches will be coalesced into larger batches. This is
helpful when there are highly selective filters or joins that could produce
tiny output batches. The target batch size is determined by the configuration
setting
[...]
+| datafusion.execution.collect_statistics | false
| Should DataFusion collect statistics after listing files
[...]
+| datafusion.execution.target_partitions | 0
| Number of partitions for query execution. Increasing partitions can
increase concurrency. Defaults to the number of CPU cores on the system
[...]
+| datafusion.execution.time_zone | +00:00
| The default time zone Some functions, e.g. `EXTRACT(HOUR from
SOME_TIME)`, shift the underlying datetime according to this time zone, and
then extract the hour
[...]
+| datafusion.execution.parquet.enable_page_index | true
| If true, reads the Parquet data page level metadata (the Page
Index), if present, to reduce the I/O and number of rows decoded.
[...]
+| datafusion.execution.parquet.pruning | true
| If true, the parquet reader attempts to skip entire row groups
based on the predicate in the query and the metadata (min/max values) stored in
the parquet file
[...]
+| datafusion.execution.parquet.skip_metadata | true
| If true, the parquet reader skip the optional embedded metadata
that may be in the file Schema. This setting can help avoid schema conflicts
when querying multiple parquet files with schemas containing compatible types
but different metadata
[...]
+| datafusion.execution.parquet.metadata_size_hint | NULL
| If specified, the parquet reader will try and fetch the last
`size_hint` bytes of the parquet file optimistically. If not specified, two
reads are required: One read to fetch the 8-byte parquet footer and another to
fetch the metadata length encoded in the footer
[...]
+| datafusion.execution.parquet.pushdown_filters | false
| If true, filter expressions are be applied during the parquet
decoding operation to reduce the number of rows decoded
[...]
+| datafusion.execution.parquet.reorder_filters | false
| If true, filter expressions evaluated during the parquet decoding
operation will be reordered heuristically to minimize the cost of evaluation.
If false, the filters are applied in the same order as written in the query
[...]
+| datafusion.execution.parquet.data_pagesize_limit | 1048576
| Sets best effort maximum size of data page in bytes
[...]
+| datafusion.execution.parquet.write_batch_size | 1024
| Sets write_batch_size in bytes
[...]
+| datafusion.execution.parquet.writer_version | 1.0
| Sets parquet writer version valid values are "1.0" and "2.0"
[...]
+| datafusion.execution.parquet.compression | snappy
| Sets default parquet compression codec Valid values are:
uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and
lz4_raw. These values are not case sensitive.
[...]
+| datafusion.execution.parquet.dictionary_enabled | true
| Sets if dictionary encoding is enabled
[...]
+| datafusion.execution.parquet.dictionary_page_size_limit | 1048576
| Sets best effort maximum dictionary page size, in bytes
[...]
+| datafusion.execution.parquet.statistics_enabled | page
| Sets if statistics are enabled for any column Valid values are:
"none", "chunk", and "page" These values are not case sensitive.
[...]
+| datafusion.execution.parquet.max_statistics_size | 4096
| Sets max statistics size for any column
[...]
+| datafusion.execution.parquet.max_row_group_size | 1048576
| Sets maximum number of rows in a row group
[...]
+| datafusion.execution.parquet.created_by | datafusion
version 28.0.0 | Sets "created by" property
[...]
+| datafusion.execution.parquet.column_index_truncate_length | NULL
| Sets column index trucate length
[...]
+| datafusion.execution.parquet.data_page_row_count_limit |
18446744073709551615 | Sets best effort maximum number of rows in data
page
[...]
+| datafusion.execution.parquet.encoding | plain
| Sets default encoding for any column Valid values are: plain,
plain_dictionary, rle, bit_packed, delta_binary_packed,
delta_length_byte_array, delta_byte_array, rle_dictionary, and
byte_stream_split. These values are not case sensitive.
[...]
+| datafusion.execution.parquet.bloom_filter_enabled | false
| Sets if bloom filter is enabled for any column
[...]
+| datafusion.execution.parquet.bloom_filter_fpp | 0.05
| Sets bloom filter false positive probability
[...]
+| datafusion.execution.parquet.bloom_filter_ndv | 1000000
| Sets bloom filter number of distinct values
[...]
+| datafusion.execution.aggregate.scalar_update_factor | 10
| Specifies the threshold for using `ScalarValue`s to update
accumulators during high-cardinality aggregations for each input batch. The
aggregation is considered high-cardinality if the number of affected groups is
greater than or equal to `batch_size / scalar_update_factor`. In such cases,
`ScalarValue`s are utilized for updating accumulators, rather than the default
batch-slice approach. This can [...]
+| datafusion.execution.planning_concurrency | 0
| Fan-out during initial physical planning. This is mostly use to
plan `UNION` children in parallel. Defaults to the number of CPU cores on the
system
[...]
+| datafusion.execution.sort_spill_reservation_bytes | 10485760
| Specifies the reserved memory for each spillable sort operation to
facilitate an in-memory merge. When a sort operation spills to disk, the
in-memory data must be sorted and merged before being written to a file. This
setting reserves a specific amount of memory for that in-memory sort/merge
process. Note: This setting is irrelevant if the sort operation cannot spill
(i.e., if there's no `DiskManag [...]
+| datafusion.execution.sort_in_place_threshold_bytes | 1048576
| When sorting, below what size should data be concatenated and
sorted in a single RecordBatch rather than sorted in batches and merged.
[...]
+| datafusion.optimizer.enable_round_robin_repartition | true
| When set to true, the physical plan optimizer will try to add round
robin repartitioning to increase parallelism to leverage more CPU cores
[...]
+| datafusion.optimizer.filter_null_join_keys | false
| When set to true, the optimizer will insert filters before a join
between a nullable and non-nullable column to filter out nulls on the nullable
side. This filter can add additional overhead when the file format does not
fully support predicate push down.
[...]
+| datafusion.optimizer.repartition_aggregations | true
| Should DataFusion repartition data using the aggregate keys to
execute aggregates in parallel using the provided `target_partitions` level
[...]
+| datafusion.optimizer.repartition_file_min_size | 10485760
| Minimum total files size in bytes to perform file scan
repartitioning.
[...]
+| datafusion.optimizer.repartition_joins | true
| Should DataFusion repartition data using the join keys to execute
joins in parallel using the provided `target_partitions` level
[...]
+| datafusion.optimizer.allow_symmetric_joins_without_pruning | true
| Should DataFusion allow symmetric hash joins for unbounded data
sources even when its inputs do not have any ordering or filtering If the flag
is not enabled, the SymmetricHashJoin operator will be unable to prune its
internal buffers, resulting in certain join types - such as Full, Left,
LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at
the end of the execution. This is [...]
+| datafusion.optimizer.repartition_file_scans | true
| When set to `true`, file groups will be repartitioned to achieve
maximum parallelism. Currently Parquet and CSV formats are supported. If set to
`true`, all files will be repartitioned evenly (i.e., a single large file might
be partitioned into smaller chunks) for parallel scanning. If set to `false`,
different files will be read in parallel, but repartitioning won't happen
within a single file. [...]
+| datafusion.optimizer.repartition_windows | true
| Should DataFusion repartition data using the partitions keys to
execute window functions in parallel using the provided `target_partitions`
level
[...]
+| datafusion.optimizer.repartition_sorts | true
| Should DataFusion execute sorts in a per-partition fashion and
merge afterwards instead of coalescing first and sorting globally. With this
flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", "
CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1", ` would turn into the plan below which performs better in
multithreaded environments `text [...]
+| datafusion.optimizer.bounded_order_preserving_variants | false
| When true, DataFusion will opportunistically remove sorts by
replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and
`CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is
bounded.
[...]
+| datafusion.optimizer.skip_failed_rules | false
| When set to true, the logical plan optimizer will produce warning
messages if any optimization rules produce errors and then proceed to the next
rule. When set to false, any rules that produce errors will cause the query to
fail
[...]
+| datafusion.optimizer.max_passes | 3
| Number of times that the optimizer will attempt to optimize the
plan
[...]
+| datafusion.optimizer.top_down_join_key_reordering | true
| When set to true, the physical plan optimizer will run a top down
process to reorder the join keys
[...]
+| datafusion.optimizer.prefer_hash_join | true
| When set to true, the physical plan optimizer will prefer HashJoin
over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but
consumes more memory
[...]
+| datafusion.optimizer.hash_join_single_partition_threshold | 1048576
| The maximum estimated size in bytes for one input side of a
HashJoin will be collected into a single partition
[...]
+| datafusion.explain.logical_plan_only | false
| When set to true, the explain statement will only print logical
plans
[...]
+| datafusion.explain.physical_plan_only | false
| When set to true, the explain statement will only print physical
plans
[...]
+| datafusion.sql_parser.parse_float_as_decimal | false
| When set to true, SQL parser will parse float as decimal type
[...]
+| datafusion.sql_parser.enable_ident_normalization | true
| When set to true, SQL parser will normalize ident (convert ident to
lowercase when not quoted)
[...]
+| datafusion.sql_parser.dialect | generic
| Configure the SQL dialect used by DataFusion's parser; supported
values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift,
MsSQL, ClickHouse, BigQuery, and Ansi.
[...]