This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b54e648a7e Supporting writing schema metadata when writing Parquet in
parallel (#13866)
b54e648a7e is described below
commit b54e648a7ee82e1ef292ef36df7d49902171b94f
Author: wiedld <[email protected]>
AuthorDate: Wed Jan 1 08:03:11 2025 -0500
Supporting writing schema metadata when writing Parquet in parallel (#13866)
* refactor: make ParquetSink tests a bit more readable
* chore(11770): add new ParquetOptions.skip_arrow_metadata
* test(11770): demonstrate that the single threaded ParquetSink is already
writing the arrow schema in the kv_meta, and allow disablement
* refactor(11770): replace with new method, since the kv_metadata is
inherent to TableParquetOptions and therefore we should explicitly make the API
apparant that you have to include the arrow schema or not
* fix(11770): fix parallel ParquetSink to encode arrow schema into the
file metadata, based on the ParquetOptions
* refactor(11770): provide deprecation warning for TryFrom
* test(11770): update tests with new default to include arrow schema
* refactor: including partitioning of arrow schema inserted into kv_metdata
* test: update tests for new config prop, as well as the new file partition
offsets based upon larger metadata
* chore: avoid cloning in tests, and update code docs
* refactor: return to the
WriterPropertiesBuilder::TryFrom<TableParquetOptions>, and separately add the
arrow_schema to the kv_metadata on the TableParquetOptions
* refactor: require the arrow_schema key to be present in the kv_metadata,
if is required by the configuration
* chore: update configs.md
* test: update tests to handle the (default) required arrow schema in the
kv_metadata
* chore: add reference to arrow-rs upstream PR
---
datafusion-cli/Cargo.lock | 2 +
datafusion/common/Cargo.toml | 2 +
datafusion/common/src/config.rs | 20 ++
datafusion/common/src/file_options/mod.rs | 15 +-
.../common/src/file_options/parquet_writer.rs | 113 ++++++++-
.../core/src/datasource/file_format/parquet.rs | 279 +++++++++++++++------
.../proto-common/proto/datafusion_common.proto | 1 +
datafusion/proto-common/src/from_proto/mod.rs | 1 +
datafusion/proto-common/src/generated/pbjson.rs | 18 ++
datafusion/proto-common/src/generated/prost.rs | 3 +
datafusion/proto-common/src/to_proto/mod.rs | 1 +
.../proto/src/generated/datafusion_proto_common.rs | 3 +
datafusion/proto/src/logical_plan/file_formats.rs | 2 +
.../sqllogictest/test_files/information_schema.slt | 2 +
.../sqllogictest/test_files/repartition_scan.slt | 8 +-
docs/source/user-guide/configs.md | 1 +
16 files changed, 380 insertions(+), 91 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 92dcf24708..4e2837e481 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1335,7 +1335,9 @@ dependencies = [
"arrow",
"arrow-array",
"arrow-buffer",
+ "arrow-ipc",
"arrow-schema",
+ "base64 0.22.1",
"half",
"hashbrown 0.14.5",
"indexmap",
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index b331a55a98..feba589082 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -53,7 +53,9 @@ apache-avro = { version = "0.17", default-features = false,
features = [
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
+arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
+base64 = "0.22.1"
half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 942aa308e2..4da6921ba5 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -436,6 +436,12 @@ config_namespace! {
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".to_string()
+ /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
+ ///
+ /// This is analogous to the
`ArrowWriterOptions::with_skip_arrow_metadata`.
+ /// Refer to
<https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
+ pub skip_arrow_metadata: bool, default = false
+
/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
@@ -1496,6 +1502,20 @@ impl TableParquetOptions {
pub fn new() -> Self {
Self::default()
}
+
+ /// Set whether the encoding of the arrow metadata should occur
+ /// during the writing of parquet.
+ ///
+ /// Default is to encode the arrow schema in the file kv_metadata.
+ pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
+ Self {
+ global: ParquetOptions {
+ skip_arrow_metadata: skip,
+ ..self.global
+ },
+ ..self
+ }
+ }
}
impl ConfigField for TableParquetOptions {
diff --git a/datafusion/common/src/file_options/mod.rs
b/datafusion/common/src/file_options/mod.rs
index 77781457d0..02667e0165 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -30,7 +30,6 @@ pub mod parquet_writer;
mod tests {
use std::collections::HashMap;
- use super::parquet_writer::ParquetWriterOptions;
use crate::{
config::{ConfigFileType, TableOptions},
file_options::{csv_writer::CsvWriterOptions,
json_writer::JsonWriterOptions},
@@ -40,7 +39,7 @@ mod tests {
use parquet::{
basic::{Compression, Encoding, ZstdLevel},
- file::properties::{EnabledStatistics, WriterVersion},
+ file::properties::{EnabledStatistics, WriterPropertiesBuilder,
WriterVersion},
schema::types::ColumnPath,
};
@@ -79,8 +78,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;
- let parquet_options =
ParquetWriterOptions::try_from(&table_config.parquet)?;
- let properties = parquet_options.writer_options();
+ let properties = WriterPropertiesBuilder::try_from(
+ &table_config.parquet.with_skip_arrow_metadata(true),
+ )?
+ .build();
// Verify the expected options propagated down to parquet crate
WriterProperties struct
assert_eq!(properties.max_row_group_size(), 123);
@@ -184,8 +185,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;
- let parquet_options =
ParquetWriterOptions::try_from(&table_config.parquet)?;
- let properties = parquet_options.writer_options();
+ let properties = WriterPropertiesBuilder::try_from(
+ &table_config.parquet.with_skip_arrow_metadata(true),
+ )?
+ .build();
let col1 = ColumnPath::from(vec!["col1".to_owned()]);
let col2_nested = ColumnPath::from(vec!["col2".to_owned(),
"nested".to_owned()]);
diff --git a/datafusion/common/src/file_options/parquet_writer.rs
b/datafusion/common/src/file_options/parquet_writer.rs
index dd9d67d6bb..46bce06470 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -17,18 +17,25 @@
//! Options related to how parquet files should be written
+use base64::Engine;
+use std::sync::Arc;
+
use crate::{
config::{ParquetOptions, TableParquetOptions},
- DataFusionError, Result,
+ DataFusionError, Result, _internal_datafusion_err,
};
+use arrow_schema::Schema;
use parquet::{
+ arrow::ARROW_SCHEMA_META_KEY,
basic::{BrotliLevel, GzipLevel, ZstdLevel},
- file::properties::{
- EnabledStatistics, WriterProperties, WriterPropertiesBuilder,
WriterVersion,
- DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
+ file::{
+ metadata::KeyValue,
+ properties::{
+ EnabledStatistics, WriterProperties, WriterPropertiesBuilder,
WriterVersion,
+ DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
+ },
},
- format::KeyValue,
schema::types::ColumnPath,
};
@@ -51,6 +58,17 @@ impl ParquetWriterOptions {
}
}
+impl TableParquetOptions {
+ /// Add the arrow schema to the parquet kv_metadata.
+ /// If already exists, then overwrites.
+ pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
+ self.key_value_metadata.insert(
+ ARROW_SCHEMA_META_KEY.into(),
+ Some(encode_arrow_schema(schema)),
+ );
+ }
+}
+
impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;
@@ -79,6 +97,14 @@ impl TryFrom<&TableParquetOptions> for
WriterPropertiesBuilder {
let mut builder = global.into_writer_properties_builder()?;
+ // check that the arrow schema is present in the kv_metadata, if
configured to do so
+ if !global.skip_arrow_metadata
+ && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
+ {
+ return Err(_internal_datafusion_err!("arrow schema was not added
to the kv_metadata, even though it is required by configuration settings"));
+ }
+
+ // add kv_meta, if any
if !key_value_metadata.is_empty() {
builder = builder.set_key_value_metadata(Some(
key_value_metadata
@@ -140,11 +166,38 @@ impl TryFrom<&TableParquetOptions> for
WriterPropertiesBuilder {
}
}
+/// Encodes the Arrow schema into the IPC format, and base64 encodes it
+///
+/// TODO: use extern parquet's private method, once publicly available.
+/// Refer to <https://github.com/apache/arrow-rs/pull/6916>
+fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
+ let options = arrow_ipc::writer::IpcWriteOptions::default();
+ let mut dictionary_tracker =
arrow_ipc::writer::DictionaryTracker::new(true);
+ let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
+ let mut serialized_schema =
data_gen.schema_to_bytes_with_dictionary_tracker(
+ schema,
+ &mut dictionary_tracker,
+ &options,
+ );
+
+ // manually prepending the length to the schema as arrow uses the legacy
IPC format
+ // TODO: change after addressing ARROW-9777
+ let schema_len = serialized_schema.ipc_message.len();
+ let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
+ len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
+ len_prefix_schema.append((schema_len as
u32).to_le_bytes().to_vec().as_mut());
+ len_prefix_schema.append(&mut serialized_schema.ipc_message);
+
+ base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
+}
+
impl ParquetOptions {
/// Convert the global session options, [`ParquetOptions`], into a single
write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] can then be further modified
with additional options
/// applied per column; a customization which is not applicable for
[`ParquetOptions`].
+ ///
+ /// Note that this method does not include the key_value_metadata from
[`TableParquetOptions`].
pub fn into_writer_properties_builder(&self) ->
Result<WriterPropertiesBuilder> {
let ParquetOptions {
data_pagesize_limit,
@@ -177,6 +230,7 @@ impl ParquetOptions {
bloom_filter_on_read: _, // reads not used for writer props
schema_force_view_types: _,
binary_as_string: _, // not used for writer props
+ skip_arrow_metadata: _,
} = self;
let mut builder = WriterProperties::builder()
@@ -444,6 +498,7 @@ mod tests {
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_view_types: defaults.schema_force_view_types,
binary_as_string: defaults.binary_as_string,
+ skip_arrow_metadata: defaults.skip_arrow_metadata,
}
}
@@ -546,19 +601,55 @@ mod tests {
bloom_filter_on_read:
global_options_defaults.bloom_filter_on_read,
schema_force_view_types:
global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
+ skip_arrow_metadata:
global_options_defaults.skip_arrow_metadata,
},
column_specific_options,
key_value_metadata,
}
}
+ #[test]
+ fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
+ // TableParquetOptions, all props set to default
+ let mut table_parquet_opts = TableParquetOptions::default();
+ assert!(
+ !table_parquet_opts.global.skip_arrow_metadata,
+ "default false, to not skip the arrow schema requirement"
+ );
+
+ // see errors without the schema added, using default settings
+ let should_error =
WriterPropertiesBuilder::try_from(&table_parquet_opts);
+ assert!(
+ should_error.is_err(),
+ "should error without the required arrow schema in kv_metadata",
+ );
+
+ // succeeds if we permit skipping the arrow schema
+ table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
+ let should_succeed =
WriterPropertiesBuilder::try_from(&table_parquet_opts);
+ assert!(
+ should_succeed.is_ok(),
+ "should work with the arrow schema skipped by config",
+ );
+
+ // Set the arrow schema back to required
+ table_parquet_opts =
table_parquet_opts.with_skip_arrow_metadata(false);
+ // add the arrow schema to the kv_meta
+ table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
+ let should_succeed =
WriterPropertiesBuilder::try_from(&table_parquet_opts);
+ assert!(
+ should_succeed.is_ok(),
+ "should work with the arrow schema included in
TableParquetOptions",
+ );
+ }
+
#[test]
fn table_parquet_opts_to_writer_props() {
// ParquetOptions, all props set to non-default
let parquet_options = parquet_options_with_non_defaults();
// TableParquetOptions, using ParquetOptions for global settings
- let key = "foo".to_string();
+ let key = ARROW_SCHEMA_META_KEY.to_string();
let value = Some("bar".into());
let table_parquet_opts = TableParquetOptions {
global: parquet_options.clone(),
@@ -585,7 +676,7 @@ mod tests {
#[test]
fn test_defaults_match() {
// ensure the global settings are the same
- let default_table_writer_opts = TableParquetOptions::default();
+ let mut default_table_writer_opts = TableParquetOptions::default();
let default_parquet_opts = ParquetOptions::default();
assert_eq!(
default_table_writer_opts.global,
@@ -593,6 +684,10 @@ mod tests {
"should have matching defaults for TableParquetOptions.global and
ParquetOptions",
);
+ // selectively skip the arrow_schema metadata, since the
WriterProperties default has an empty kv_meta (no arrow schema)
+ default_table_writer_opts =
+ default_table_writer_opts.with_skip_arrow_metadata(true);
+
// WriterProperties::default, a.k.a. using extern parquet's defaults
let default_writer_props = WriterProperties::new();
@@ -640,6 +735,7 @@ mod tests {
session_config_from_writer_props(&default_writer_props);
from_extern_parquet.global.created_by = same_created_by;
from_extern_parquet.global.compression = Some("zstd(3)".into());
+ from_extern_parquet.global.skip_arrow_metadata = true;
assert_eq!(
default_table_writer_opts,
@@ -653,6 +749,7 @@ mod tests {
// the TableParquetOptions::default, with only the bloom filter turned
on
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
+ default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); //
add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
@@ -681,6 +778,7 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
+ default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); //
add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
@@ -713,6 +811,7 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_ndv = Some(42);
+ default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); //
add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 383fd65752..8f64bea39d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -45,7 +45,6 @@ use crate::physical_plan::{
use arrow::compute::sum;
use datafusion_common::config::{ConfigField, ConfigFileType,
TableParquetOptions};
-use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
@@ -68,13 +67,13 @@ use log::debug;
use object_store::buffered::BufWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
- ArrowLeafColumn,
+ ArrowLeafColumn, ArrowWriterOptions,
};
use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader,
RowGroupMetaData};
-use parquet::file::properties::WriterProperties;
+use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
@@ -750,6 +749,28 @@ impl ParquetSink {
}
}
+ /// Create writer properties based upon configuration settings,
+ /// including partitioning and the inclusion of arrow schema metadata.
+ fn create_writer_props(&self) -> Result<WriterProperties> {
+ let schema = if
self.parquet_options.global.allow_single_file_parallelism {
+ // If parallelizing writes, we may be also be doing hive style
partitioning
+ // into multiple files which impacts the schema per file.
+ // Refer to `self.get_writer_schema()`
+ &self.get_writer_schema()
+ } else {
+ self.config.output_schema()
+ };
+
+ // TODO: avoid this clone in follow up PR, where the writer properties
& schema
+ // are calculated once on `ParquetSink::new`
+ let mut parquet_opts = self.parquet_options.clone();
+ if !self.parquet_options.global.skip_arrow_metadata {
+ parquet_opts.arrow_schema(schema);
+ }
+
+ Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build())
+ }
+
/// Creates an AsyncArrowWriter which serializes a parquet file to an
ObjectStore
/// AsyncArrowWriters are used when individual parquet file serialization
is not parallelized
async fn create_async_arrow_writer(
@@ -759,10 +780,14 @@ impl ParquetSink {
parquet_props: WriterProperties,
) -> Result<AsyncArrowWriter<BufWriter>> {
let buf_writer = BufWriter::new(object_store, location.clone());
- let writer = AsyncArrowWriter::try_new(
+ let options = ArrowWriterOptions::new()
+ .with_properties(parquet_props)
+
.with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
+
+ let writer = AsyncArrowWriter::try_new_with_options(
buf_writer,
self.get_writer_schema(),
- Some(parquet_props),
+ options,
)?;
Ok(writer)
}
@@ -788,7 +813,7 @@ impl DataSink for ParquetSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
- let parquet_props =
ParquetWriterOptions::try_from(&self.parquet_options)?;
+ let parquet_props = self.create_writer_props()?;
let object_store = context
.runtime_env()
@@ -832,7 +857,7 @@ impl DataSink for ParquetSink {
.create_async_arrow_writer(
&path,
Arc::clone(&object_store),
- parquet_props.writer_options().clone(),
+ parquet_props.clone(),
)
.await?;
let mut reservation =
@@ -867,7 +892,7 @@ impl DataSink for ParquetSink {
writer,
rx,
schema,
- props.writer_options(),
+ &props,
parallel_options_clone,
pool,
)
@@ -2335,42 +2360,74 @@ mod tests {
async fn parquet_sink_write() -> Result<()> {
let parquet_sink = create_written_parquet_sink("file:///").await?;
- // assert written
- let mut written = parquet_sink.written();
- let written = written.drain();
- assert_eq!(
- written.len(),
- 1,
- "expected a single parquet files to be written, instead found {}",
- written.len()
- );
+ // assert written to proper path
+ let (path, file_metadata) = get_written(parquet_sink)?;
+ let path_parts = path.parts().collect::<Vec<_>>();
+ assert_eq!(path_parts.len(), 1, "should not have path prefix");
// check the file metadata
- let (
- path,
- FileMetaData {
- num_rows,
- schema,
- key_value_metadata,
- ..
+ let expected_kv_meta = vec![
+ // default is to include arrow schema
+ KeyValue {
+ key: "ARROW:schema".to_string(),
+ value: Some(ENCODED_ARROW_SCHEMA.to_string()),
+ },
+ KeyValue {
+ key: "my-data".to_string(),
+ value: Some("stuff".to_string()),
+ },
+ KeyValue {
+ key: "my-data-bool-key".to_string(),
+ value: None,
},
- ) = written.take(1).next().unwrap();
+ ];
+ assert_file_metadata(file_metadata, &expected_kv_meta);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn parquet_sink_parallel_write() -> Result<()> {
+ let opts = ParquetOptions {
+ allow_single_file_parallelism: true,
+ maximum_parallel_row_group_writers: 2,
+ maximum_buffered_record_batches_per_stream: 2,
+ ..Default::default()
+ };
+
+ let parquet_sink =
+ create_written_parquet_sink_using_config("file:///", opts).await?;
+
+ // assert written to proper path
+ let (path, file_metadata) = get_written(parquet_sink)?;
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 1, "should not have path prefix");
- assert_eq!(num_rows, 2, "file metadata to have 2 rows");
- assert!(
- schema.iter().any(|col_schema| col_schema.name == "a"),
- "output file metadata should contain col a"
- );
- assert!(
- schema.iter().any(|col_schema| col_schema.name == "b"),
- "output file metadata should contain col b"
- );
+ // check the file metadata
+ let expected_kv_meta = vec![
+ // default is to include arrow schema
+ KeyValue {
+ key: "ARROW:schema".to_string(),
+ value: Some(ENCODED_ARROW_SCHEMA.to_string()),
+ },
+ KeyValue {
+ key: "my-data".to_string(),
+ value: Some("stuff".to_string()),
+ },
+ KeyValue {
+ key: "my-data-bool-key".to_string(),
+ value: None,
+ },
+ ];
+ assert_file_metadata(file_metadata, &expected_kv_meta);
- let mut key_value_metadata = key_value_metadata.unwrap();
- key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
- let expected_metadata = vec![
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
+ // expected kv metadata without schema
+ let expected_without = vec![
KeyValue {
key: "my-data".to_string(),
value: Some("stuff".to_string()),
@@ -2380,7 +2437,63 @@ mod tests {
value: None,
},
];
- assert_eq!(key_value_metadata, expected_metadata);
+ // expected kv metadata with schema
+ let expected_with = [
+ vec![KeyValue {
+ key: "ARROW:schema".to_string(),
+ value: Some(ENCODED_ARROW_SCHEMA.to_string()),
+ }],
+ expected_without.clone(),
+ ]
+ .concat();
+
+ // single threaded write, skip insert
+ let opts = ParquetOptions {
+ allow_single_file_parallelism: false,
+ skip_arrow_metadata: true,
+ ..Default::default()
+ };
+ let parquet_sink =
+ create_written_parquet_sink_using_config("file:///", opts).await?;
+ let (_, file_metadata) = get_written(parquet_sink)?;
+ assert_file_metadata(file_metadata, &expected_without);
+
+ // single threaded write, do not skip insert
+ let opts = ParquetOptions {
+ allow_single_file_parallelism: false,
+ skip_arrow_metadata: false,
+ ..Default::default()
+ };
+ let parquet_sink =
+ create_written_parquet_sink_using_config("file:///", opts).await?;
+ let (_, file_metadata) = get_written(parquet_sink)?;
+ assert_file_metadata(file_metadata, &expected_with);
+
+ // multithreaded write, skip insert
+ let opts = ParquetOptions {
+ allow_single_file_parallelism: true,
+ maximum_parallel_row_group_writers: 2,
+ maximum_buffered_record_batches_per_stream: 2,
+ skip_arrow_metadata: true,
+ ..Default::default()
+ };
+ let parquet_sink =
+ create_written_parquet_sink_using_config("file:///", opts).await?;
+ let (_, file_metadata) = get_written(parquet_sink)?;
+ assert_file_metadata(file_metadata, &expected_without);
+
+ // multithreaded write, do not skip insert
+ let opts = ParquetOptions {
+ allow_single_file_parallelism: true,
+ maximum_parallel_row_group_writers: 2,
+ maximum_buffered_record_batches_per_stream: 2,
+ skip_arrow_metadata: false,
+ ..Default::default()
+ };
+ let parquet_sink =
+ create_written_parquet_sink_using_config("file:///", opts).await?;
+ let (_, file_metadata) = get_written(parquet_sink)?;
+ assert_file_metadata(file_metadata, &expected_with);
Ok(())
}
@@ -2391,18 +2504,8 @@ mod tests {
let file_path = format!("file:///path/to/{}", filename);
let parquet_sink =
create_written_parquet_sink(file_path.as_str()).await?;
- // assert written
- let mut written = parquet_sink.written();
- let written = written.drain();
- assert_eq!(
- written.len(),
- 1,
- "expected a single parquet file to be written, instead found {}",
- written.len()
- );
-
- let (path, ..) = written.take(1).next().unwrap();
-
+ // assert written to proper path
+ let (path, _) = get_written(parquet_sink)?;
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(
path_parts.len(),
@@ -2420,18 +2523,8 @@ mod tests {
let file_path = "file:///path/to";
let parquet_sink = create_written_parquet_sink(file_path).await?;
- // assert written
- let mut written = parquet_sink.written();
- let written = written.drain();
- assert_eq!(
- written.len(),
- 1,
- "expected a single parquet file to be written, instead found {}",
- written.len()
- );
-
- let (path, ..) = written.take(1).next().unwrap();
-
+ // assert written to proper path
+ let (path, _) = get_written(parquet_sink)?;
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(
path_parts.len(),
@@ -2449,18 +2542,8 @@ mod tests {
let file_path = "file:///path/to/";
let parquet_sink = create_written_parquet_sink(file_path).await?;
- // assert written
- let mut written = parquet_sink.written();
- let written = written.drain();
- assert_eq!(
- written.len(),
- 1,
- "expected a single parquet file to be written, instead found {}",
- written.len()
- );
-
- let (path, ..) = written.take(1).next().unwrap();
-
+ // assert written to proper path
+ let (path, _) = get_written(parquet_sink)?;
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(
path_parts.len(),
@@ -2474,6 +2557,17 @@ mod tests {
}
async fn create_written_parquet_sink(table_path: &str) ->
Result<Arc<ParquetSink>> {
+ create_written_parquet_sink_using_config(table_path,
ParquetOptions::default())
+ .await
+ }
+
+ static ENCODED_ARROW_SCHEMA: &str =
"/////5QAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAAA8AAAABAAAANz///8UAAAADAAAAAAAAAUMAAAAAAAAAMz///8BAAAAYgAAABAAFAAQAAAADwAEAAAACAAQAAAAGAAAAAwAAAAAAAAFEAAAAAAAAAAEAAQABAAAAAEAAABhAAAA";
+
+ async fn create_written_parquet_sink_using_config(
+ table_path: &str,
+ global: ParquetOptions,
+ ) -> Result<Arc<ParquetSink>> {
+ // schema should match the ENCODED_ARROW_SCHEMA bove
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -2495,6 +2589,7 @@ mod tests {
("my-data".to_string(), Some("stuff".to_string())),
("my-data-bool-key".to_string(), None),
]),
+ global,
..Default::default()
},
));
@@ -2519,6 +2614,42 @@ mod tests {
Ok(parquet_sink)
}
+ fn get_written(parquet_sink: Arc<ParquetSink>) -> Result<(Path,
FileMetaData)> {
+ let mut written = parquet_sink.written();
+ let written = written.drain();
+ assert_eq!(
+ written.len(),
+ 1,
+ "expected a single parquet files to be written, instead found {}",
+ written.len()
+ );
+
+ let (path, file_metadata) = written.take(1).next().unwrap();
+ Ok((path, file_metadata))
+ }
+
+ fn assert_file_metadata(file_metadata: FileMetaData, expected_kv:
&Vec<KeyValue>) {
+ let FileMetaData {
+ num_rows,
+ schema,
+ key_value_metadata,
+ ..
+ } = file_metadata;
+ assert_eq!(num_rows, 2, "file metadata to have 2 rows");
+ assert!(
+ schema.iter().any(|col_schema| col_schema.name == "a"),
+ "output file metadata should contain col a"
+ );
+ assert!(
+ schema.iter().any(|col_schema| col_schema.name == "b"),
+ "output file metadata should contain col b"
+ );
+
+ let mut key_value_metadata = key_value_metadata.unwrap();
+ key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
+ assert_eq!(&key_value_metadata, expected_kv);
+ }
+
#[tokio::test]
async fn parquet_sink_write_partitions() -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
diff --git a/datafusion/proto-common/proto/datafusion_common.proto
b/datafusion/proto-common/proto/datafusion_common.proto
index 69626f97fd..6a7dc1604b 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -497,6 +497,7 @@ message ParquetOptions {
bool bloom_filter_on_write = 27; // default = false
bool schema_force_view_types = 28; // default = false
bool binary_as_string = 29; // default = false
+ bool skip_arrow_metadata = 30; // default = false
oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
diff --git a/datafusion/proto-common/src/from_proto/mod.rs
b/datafusion/proto-common/src/from_proto/mod.rs
index eb6976aa0c..ca8306275b 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -962,6 +962,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
maximum_buffered_record_batches_per_stream:
value.maximum_buffered_record_batches_per_stream as usize,
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
+ skip_arrow_metadata: value.skip_arrow_metadata,
})
}
}
diff --git a/datafusion/proto-common/src/generated/pbjson.rs
b/datafusion/proto-common/src/generated/pbjson.rs
index e88c1497af..e9f9de09d4 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -4940,6 +4940,9 @@ impl serde::Serialize for ParquetOptions {
if self.binary_as_string {
len += 1;
}
+ if self.skip_arrow_metadata {
+ len += 1;
+ }
if self.dictionary_page_size_limit != 0 {
len += 1;
}
@@ -5033,6 +5036,9 @@ impl serde::Serialize for ParquetOptions {
if self.binary_as_string {
struct_ser.serialize_field("binaryAsString",
&self.binary_as_string)?;
}
+ if self.skip_arrow_metadata {
+ struct_ser.serialize_field("skipArrowMetadata",
&self.skip_arrow_metadata)?;
+ }
if self.dictionary_page_size_limit != 0 {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
@@ -5161,6 +5167,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"schemaForceViewTypes",
"binary_as_string",
"binaryAsString",
+ "skip_arrow_metadata",
+ "skipArrowMetadata",
"dictionary_page_size_limit",
"dictionaryPageSizeLimit",
"data_page_row_count_limit",
@@ -5204,6 +5212,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
BloomFilterOnWrite,
SchemaForceViewTypes,
BinaryAsString,
+ SkipArrowMetadata,
DictionaryPageSizeLimit,
DataPageRowCountLimit,
MaxRowGroupSize,
@@ -5253,6 +5262,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"bloomFilterOnWrite" | "bloom_filter_on_write" =>
Ok(GeneratedField::BloomFilterOnWrite),
"schemaForceViewTypes" | "schema_force_view_types"
=> Ok(GeneratedField::SchemaForceViewTypes),
"binaryAsString" | "binary_as_string" =>
Ok(GeneratedField::BinaryAsString),
+ "skipArrowMetadata" | "skip_arrow_metadata" =>
Ok(GeneratedField::SkipArrowMetadata),
"dictionaryPageSizeLimit" |
"dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit),
"dataPageRowCountLimit" |
"data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit),
"maxRowGroupSize" | "max_row_group_size" =>
Ok(GeneratedField::MaxRowGroupSize),
@@ -5300,6 +5310,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut bloom_filter_on_write__ = None;
let mut schema_force_view_types__ = None;
let mut binary_as_string__ = None;
+ let mut skip_arrow_metadata__ = None;
let mut dictionary_page_size_limit__ = None;
let mut data_page_row_count_limit__ = None;
let mut max_row_group_size__ = None;
@@ -5413,6 +5424,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
binary_as_string__ = Some(map_.next_value()?);
}
+ GeneratedField::SkipArrowMetadata => {
+ if skip_arrow_metadata__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("skipArrowMetadata"));
+ }
+ skip_arrow_metadata__ = Some(map_.next_value()?);
+ }
GeneratedField::DictionaryPageSizeLimit => {
if dictionary_page_size_limit__.is_some() {
return
Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit"));
@@ -5515,6 +5532,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
bloom_filter_on_write:
bloom_filter_on_write__.unwrap_or_default(),
schema_force_view_types:
schema_force_view_types__.unwrap_or_default(),
binary_as_string: binary_as_string__.unwrap_or_default(),
+ skip_arrow_metadata:
skip_arrow_metadata__.unwrap_or_default(),
dictionary_page_size_limit:
dictionary_page_size_limit__.unwrap_or_default(),
data_page_row_count_limit:
data_page_row_count_limit__.unwrap_or_default(),
max_row_group_size:
max_row_group_size__.unwrap_or_default(),
diff --git a/datafusion/proto-common/src/generated/prost.rs
b/datafusion/proto-common/src/generated/prost.rs
index 6b85097758..3263c1c755 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -763,6 +763,9 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "29")]
pub binary_as_string: bool,
+ /// default = false
+ #[prost(bool, tag = "30")]
+ pub skip_arrow_metadata: bool,
#[prost(uint64, tag = "12")]
pub dictionary_page_size_limit: u64,
#[prost(uint64, tag = "18")]
diff --git a/datafusion/proto-common/src/to_proto/mod.rs
b/datafusion/proto-common/src/to_proto/mod.rs
index a7cea607cb..79faaba864 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -833,6 +833,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
maximum_buffered_record_batches_per_stream:
value.maximum_buffered_record_batches_per_stream as u64,
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
+ skip_arrow_metadata: value.skip_arrow_metadata,
})
}
}
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index 6b85097758..3263c1c755 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -763,6 +763,9 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "29")]
pub binary_as_string: bool,
+ /// default = false
+ #[prost(bool, tag = "30")]
+ pub skip_arrow_metadata: bool,
#[prost(uint64, tag = "12")]
pub dictionary_page_size_limit: u64,
#[prost(uint64, tag = "18")]
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs
b/datafusion/proto/src/logical_plan/file_formats.rs
index 62405b2fef..772e6d2342 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -410,6 +410,7 @@ impl TableParquetOptionsProto {
maximum_buffered_record_batches_per_stream:
global_options.global.maximum_buffered_record_batches_per_stream as u64,
schema_force_view_types:
global_options.global.schema_force_view_types,
binary_as_string: global_options.global.binary_as_string,
+ skip_arrow_metadata: global_options.global.skip_arrow_metadata,
}),
column_specific_options:
column_specific_options.into_iter().map(|(column_name, options)| {
ParquetColumnSpecificOptions {
@@ -501,6 +502,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions {
maximum_buffered_record_batches_per_stream:
proto.maximum_buffered_record_batches_per_stream as usize,
schema_force_view_types: proto.schema_force_view_types,
binary_as_string: proto.binary_as_string,
+ skip_arrow_metadata: proto.skip_arrow_metadata,
}
}
}
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt
b/datafusion/sqllogictest/test_files/information_schema.slt
index 7d70cd9db5..46618b32d7 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -215,6 +215,7 @@ datafusion.execution.parquet.pruning true
datafusion.execution.parquet.pushdown_filters false
datafusion.execution.parquet.reorder_filters false
datafusion.execution.parquet.schema_force_view_types true
+datafusion.execution.parquet.skip_arrow_metadata false
datafusion.execution.parquet.skip_metadata true
datafusion.execution.parquet.statistics_enabled page
datafusion.execution.parquet.write_batch_size 1024
@@ -308,6 +309,7 @@ datafusion.execution.parquet.pruning true (reading) If
true, the parquet reader
datafusion.execution.parquet.pushdown_filters false (reading) If true, filter
expressions are be applied during the parquet decoding operation to reduce the
number of rows decoded. This optimization is sometimes called "late
materialization".
datafusion.execution.parquet.reorder_filters false (reading) If true, filter
expressions evaluated during the parquet decoding operation will be reordered
heuristically to minimize the cost of evaluation. If false, the filters are
applied in the same order as written in the query
datafusion.execution.parquet.schema_force_view_types true (reading) If true,
parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and
`Binary/BinaryLarge` with `BinaryView`.
+datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding
the embedded arrow metadata in the KV_meta This is analogous to the
`ArrowWriterOptions::with_skip_arrow_metadata`. Refer to
<https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet
reader skip the optional embedded metadata that may be in the file Schema. This
setting can help avoid schema conflicts when querying multiple parquet files
with schemas containing compatible types but different metadata
datafusion.execution.parquet.statistics_enabled page (writing) Sets if
statistics are enabled for any column Valid values are: "none", "chunk", and
"page" These values are not case sensitive. If NULL, uses default parquet
writer setting
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets
write_batch_size in bytes
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt
b/datafusion/sqllogictest/test_files/repartition_scan.slt
index a1db84b878..9ba96e985f 100644
--- a/datafusion/sqllogictest/test_files/repartition_scan.slt
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -61,7 +61,7 @@ logical_plan
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: column1@0 != 42
-03)----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..88],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:88..176],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:176..264],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:264..351]]},
projec [...]
+03)----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]},
proj [...]
# disable round robin repartitioning
statement ok
@@ -77,7 +77,7 @@ logical_plan
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: column1@0 != 42
-03)----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..88],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:88..176],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:176..264],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:264..351]]},
projec [...]
+03)----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]},
proj [...]
# enable round robin repartitioning again
statement ok
@@ -102,7 +102,7 @@ physical_plan
02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------FilterExec: column1@0 != 42
-05)--------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..174],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:174..342,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..180],
[WORKSPACE [...]
+05)--------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278],
[WORKSPACE [...]
## Read the files as though they are ordered
@@ -138,7 +138,7 @@ physical_plan
01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----FilterExec: column1@0 != 42
-04)------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..171],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..175],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:175..351],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:171..342]]},
proj [...]
+04)------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]},
proj [...]
# Cleanup
statement ok
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 329b9a95c8..1c39064c15 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -61,6 +61,7 @@ Environment variables are read during `SessionConfig`
initialisation so they mus
| datafusion.execution.parquet.data_pagesize_limit |
1048576 | (writing) Sets best effort maximum size of data
page in bytes
[...]
| datafusion.execution.parquet.write_batch_size |
1024 | (writing) Sets write_batch_size in bytes
[...]
| datafusion.execution.parquet.writer_version |
1.0 | (writing) Sets parquet writer version valid values
are "1.0" and "2.0"
[...]
+| datafusion.execution.parquet.skip_arrow_metadata |
false | (writing) Skip encoding the embedded arrow metadata
in the KV_meta This is analogous to the
`ArrowWriterOptions::with_skip_arrow_metadata`. Refer to
<https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
[...]
| datafusion.execution.parquet.compression |
zstd(3) | (writing) Sets default parquet compression codec.
Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4,
zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses
default parquet writer setting Note that this default setting is not the same
as the default parquet writer setting.
[...]
| datafusion.execution.parquet.dictionary_enabled |
true | (writing) Sets if dictionary encoding is enabled.
If NULL, uses default parquet writer setting
[...]
| datafusion.execution.parquet.dictionary_page_size_limit |
1048576 | (writing) Sets best effort maximum dictionary page
size, in bytes
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]