This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-22649-2ec0ab50e39970592c8e7d25b330c92c68c3fca8
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 84bc8761ac3a126e41658b6cd0ec6bd8cc34cda8
Author: Daipayan Mukherjee <[email protected]>
AuthorDate: Fri Jun 5 10:10:21 2026 +0100

    feat: add max_row_group_bytes option to ParquetOptions (#22649)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes https://github.com/apache/datafusion/issues/22650.
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    arrow-rs 58.0 added WriterProperties::set_max_row_group_bytes (PR:
    apache/arrow-rs#9357
    Issue: apache/arrow-rs#1213), which flushes a row group when either the
    row-count or the byte limit is reached, whichever comes first, matching
    parquet-mr's parquet.block.size. DataFusion already consumes atleast
    this version of arrow but does not yet expose this new byte-based setter
    through its config.
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    - Add `max_row_group_bytes: Option<usize>` (default None) to
    ParquetOptions in `datafusion/common/src/config.rs`.
    - Wire it through `ParquetOptions::into_writer_properties_builder` to
    `WriterPropertiesBuilder::set_max_row_group_bytes`, with a guard that
    rejects Some(0) as a configuration error (arrow-rs panics on a zero byte
    limit).
    - Plumb the field through protobuf serialization - add it to the
    ParquetOptions proto message and the proto-common/proto conversions,
    with regenerated bindings.
    - Exposed as the max_row_group_bytes COPY / CREATE EXTERNAL TABLE format
    option alongside max_row_group_size.
    - Update the generated config docs and the format options table doc.
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    Yes - run locally and passing:
    
    Unit (datafusion-common, parquet_writer.rs):
    - defaults to None, so no byte limit is propagated to WriterProperties.
    - a configured value propagates to WriterProperties.
    - Some(0) is rejected with a configuration error.
    - the existing table_parquet_opts_to_writer_props round-trip and
    test_defaults_match tests were extended to cover the new field.
    
    Protobuf round-trip (datafusion-proto-common):
    - new test_parquet_options_max_row_group_bytes_round_trip confirms the
    option survives serialization to protobuf and back.
    
    SLTs:
    - new test_files/parquet_max_row_group_bytes.slt writes Parquet with the
    option set (via both COPY ... OPTIONS and session config), reads it
    back, asserts the data round-trips, and asserts a zero value is
    rejected.
    - copy.slt exercises the option inside the existing "all supported
    statement overrides" COPY test.
    - information_schema.slt updated for the new option in SHOW ALL.
    
    Commands run locally (all pass):
    cargo test -p datafusion-common --features parquet
    cargo test -p datafusion-proto-common
    cargo test -p datafusion-proto
    cargo test --test sqllogictests -- parquet_max_row_group_bytes
    cargo test --test sqllogictests -- information_schema
    cargo test --test sqllogictests -- copy
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    Additive only, does not affect existing options.
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---------
    
    Co-authored-by: Yongting You <[email protected]>
---
 datafusion/common/src/config.rs                    | 126 +++++++++++++-
 .../common/src/file_options/parquet_writer.rs      |  29 +++-
 .../proto-common/proto/datafusion_common.proto     |   4 +
 datafusion/proto-common/src/from_proto/mod.rs      |  24 ++-
 datafusion/proto-common/src/generated/pbjson.rs    |  24 +++
 datafusion/proto-common/src/generated/prost.rs     |   9 +
 datafusion/proto-common/src/to_proto/mod.rs        |   1 +
 .../src/generated/datafusion_proto_common.rs       |   9 +
 datafusion/proto/src/logical_plan/file_formats.rs  |  14 +-
 datafusion/sqllogictest/test_files/copy.slt        |   1 +
 .../sqllogictest/test_files/information_schema.slt |   4 +-
 .../test_files/parquet_max_row_group_bytes.slt     | 186 +++++++++++++++++++++
 docs/source/user-guide/configs.md                  |   3 +-
 docs/source/user-guide/sql/format_options.md       |   1 +
 14 files changed, 427 insertions(+), 8 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index e3e92caef3..ab1405054c 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -803,6 +803,73 @@ impl ParquetCdcOptions {
     }
 }
 
+/// Target maximum size of a Parquet row group in bytes.
+///
+/// Wraps a `usize` so the "must be greater than zero" constraint (arrow-rs
+/// panics on a zero byte limit) is validated when the config is set, rather
+/// than when the writer properties are built.
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub struct MaxRowGroupBytes(usize);
+
+impl MaxRowGroupBytes {
+    /// Creates a `MaxRowGroupBytes`, rejecting zero.
+    pub fn try_new(value: usize) -> Result<Self> {
+        if value == 0 {
+            return Err(DataFusionError::Configuration(
+                "max_row_group_bytes must be greater than 0".to_string(),
+            ));
+        }
+        Ok(Self(value))
+    }
+
+    /// Returns the configured byte limit.
+    pub fn get(&self) -> usize {
+        self.0
+    }
+}
+
+impl FromStr for MaxRowGroupBytes {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        let value = s.parse::<usize>().map_err(|_| {
+            DataFusionError::Configuration(format!(
+                "Invalid max_row_group_bytes: '{s}'. Expected a positive 
integer."
+            ))
+        })?;
+        Self::try_new(value)
+    }
+}
+
+impl Display for MaxRowGroupBytes {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{}", self.0)
+    }
+}
+
+/// `ConfigField` for `Option<MaxRowGroupBytes>`. A custom impl (rather than 
the
+/// blanket `Option<F>` one) so an invalid value is rejected without leaving 
the
+/// option in an invalid intermediate state on error. `MaxRowGroupBytes`
+/// deliberately does not implement `Default`, so the blanket impl does not 
apply.
+impl ConfigField for Option<MaxRowGroupBytes> {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) 
{
+        match self {
+            Some(s) => v.some(key, s, description),
+            None => v.none(key, description),
+        }
+    }
+
+    fn set(&mut self, _key: &str, value: &str) -> Result<()> {
+        *self = Some(MaxRowGroupBytes::from_str(value)?);
+        Ok(())
+    }
+
+    fn reset(&mut self, _key: &str) -> Result<()> {
+        *self = None;
+        Ok(())
+    }
+}
+
 config_namespace! {
     /// Options for reading and writing parquet files
     ///
@@ -936,9 +1003,21 @@ config_namespace! {
 
         /// (writing) Target maximum number of rows in each row group 
(defaults to 1M
         /// rows). Writing larger row groups requires more memory to write, but
-        /// can get better compression and be faster to read.
+        /// can get better compression and be faster to read. When
+        /// `max_row_group_bytes` is also set, the writer flushes a row group 
when
+        /// either limit is reached, whichever comes first.
         pub max_row_group_size: usize, default =  1024 * 1024
 
+        /// (writing) Target maximum size of each row group in bytes. When set,
+        /// the writer flushes whenever either this limit or 
`max_row_group_size`
+        /// is reached, whichever comes first. Useful for bounding writer 
memory
+        /// on wide schemas where a row-count limit can map to very different
+        /// byte sizes. Matches the behavior of `parquet.block.size` in
+        /// parquet-mr. If `None` (the default), only the row-count limit
+        /// applies. Currently only honored when 
`allow_single_file_parallelism`
+        /// is `false`; by default the parallel file writer ignores this limit.
+        pub max_row_group_bytes: Option<MaxRowGroupBytes>, default = None
+
         /// (writing) Sets "created by" property
         pub created_by: String, default = concat!("datafusion version ", 
env!("CARGO_PKG_VERSION")).into()
 
@@ -4081,4 +4160,49 @@ mod tests {
         assert_eq!(cdc.max_chunk_size, 1024 * 1024);
         assert_eq!(cdc.norm_level, 0);
     }
+
+    #[test]
+    fn max_row_group_bytes_rejects_zero() {
+        use crate::config::MaxRowGroupBytes;
+        use std::str::FromStr;
+
+        assert!(MaxRowGroupBytes::try_new(0).is_err());
+        assert!(MaxRowGroupBytes::from_str("0").is_err());
+        assert!(MaxRowGroupBytes::from_str("not_a_number").is_err());
+        assert_eq!(MaxRowGroupBytes::try_new(128).unwrap().get(), 128);
+        assert_eq!(MaxRowGroupBytes::from_str("128").unwrap().get(), 128);
+    }
+
+    #[test]
+    fn parquet_max_row_group_bytes_config_set_rejects_zero() {
+        use crate::config::ConfigOptions;
+
+        let mut options = ConfigOptions::new();
+        options
+            .set("datafusion.execution.parquet.max_row_group_bytes", "1024")
+            .unwrap();
+        assert_eq!(
+            options
+                .execution
+                .parquet
+                .max_row_group_bytes
+                .map(|v| v.get()),
+            Some(1024)
+        );
+
+        // Zero is rejected at set time, leaving the previous value unchanged.
+        assert!(
+            options
+                .set("datafusion.execution.parquet.max_row_group_bytes", "0")
+                .is_err()
+        );
+        assert_eq!(
+            options
+                .execution
+                .parquet
+                .max_row_group_bytes
+                .map(|v| v.get()),
+            Some(1024)
+        );
+    }
 }
diff --git a/datafusion/common/src/file_options/parquet_writer.rs 
b/datafusion/common/src/file_options/parquet_writer.rs
index d0a3cecdb8..a5b270a8f5 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -219,6 +219,7 @@ impl ParquetOptions {
             dictionary_page_size_limit,
             statistics_enabled,
             max_row_group_size,
+            max_row_group_bytes,
             created_by,
             column_index_truncate_length,
             statistics_truncate_length,
@@ -261,6 +262,7 @@ impl ParquetOptions {
                     .unwrap_or(DEFAULT_STATISTICS_ENABLED),
             )
             .set_max_row_group_row_count(Some(*max_row_group_size))
+            .set_max_row_group_bytes(max_row_group_bytes.as_ref().map(|v| 
v.get()))
             .set_created_by(created_by.clone())
             .set_column_index_truncate_length(*column_index_truncate_length)
             .set_statistics_truncate_length(*statistics_truncate_length)
@@ -428,7 +430,8 @@ mod tests {
     #[cfg(feature = "parquet_encryption")]
     use crate::config::ConfigFileEncryptionProperties;
     use crate::config::{
-        ParquetCdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, 
ParquetOptions,
+        MaxRowGroupBytes, ParquetCdcOptions, ParquetColumnOptions,
+        ParquetEncryptionOptions, ParquetOptions,
     };
     use crate::parquet_config::DFParquetWriterVersion;
     use parquet::basic::Compression;
@@ -473,6 +476,7 @@ mod tests {
             dictionary_page_size_limit: 42,
             statistics_enabled: Some("chunk".into()),
             max_row_group_size: 42,
+            max_row_group_bytes: Some(MaxRowGroupBytes::try_new(42).unwrap()),
             created_by: "wordy".into(),
             column_index_truncate_length: Some(42),
             statistics_truncate_length: Some(42),
@@ -582,6 +586,9 @@ mod tests {
                 max_row_group_size: props
                     .max_row_group_row_count()
                     .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
+                max_row_group_bytes: props
+                    .max_row_group_bytes()
+                    .and_then(|v| MaxRowGroupBytes::try_new(v).ok()),
                 created_by: props.created_by().to_string(),
                 column_index_truncate_length: 
props.column_index_truncate_length(),
                 statistics_truncate_length: props.statistics_truncate_length(),
@@ -895,6 +902,26 @@ mod tests {
         assert_eq!(cdc.norm_level, -1);
     }
 
+    #[test]
+    fn test_max_row_group_bytes_disabled_by_default() {
+        let mut opts = TableParquetOptions::default();
+        opts.arrow_schema(&Arc::new(Schema::empty()));
+
+        let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
+        assert_eq!(props.max_row_group_bytes(), None);
+    }
+
+    #[test]
+    fn test_max_row_group_bytes_propagated_to_writer_props() {
+        let mut opts = TableParquetOptions::default();
+        opts.global.max_row_group_bytes =
+            Some(MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap());
+        opts.arrow_schema(&Arc::new(Schema::empty()));
+
+        let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
+        assert_eq!(props.max_row_group_bytes(), Some(64 * 1024 * 1024));
+    }
+
     #[test]
     fn test_bloom_filter_set_ndv_only() {
         // the TableParquetOptions::default, with only ndv set
diff --git a/datafusion/proto-common/proto/datafusion_common.proto 
b/datafusion/proto-common/proto/datafusion_common.proto
index 9ad4068264..7fff5b6b71 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -627,6 +627,10 @@ message ParquetOptions {
     uint64 max_predicate_cache_size = 33;
   }
 
+  oneof max_row_group_bytes_opt {
+    uint64 max_row_group_bytes = 37;
+  }
+
   ParquetCdcOptions content_defined_chunking = 35;
 
   // Optional timezone applied to INT96-coerced timestamps when `coerce_int96`
diff --git a/datafusion/proto-common/src/from_proto/mod.rs 
b/datafusion/proto-common/src/from_proto/mod.rs
index a241ec2266..97cc9af230 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -39,8 +39,8 @@ use datafusion_common::{
     DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
     arrow_datafusion_err,
     config::{
-        CsvOptions, JsonOptions, ParquetCdcOptions, ParquetColumnOptions, 
ParquetOptions,
-        TableParquetOptions,
+        CsvOptions, JsonOptions, MaxRowGroupBytes, ParquetCdcOptions,
+        ParquetColumnOptions, ParquetOptions, TableParquetOptions,
     },
     file_options::{csv_writer::CsvWriterOptions, 
json_writer::JsonWriterOptions},
     parsers::CompressionTypeVariant,
@@ -1130,6 +1130,9 @@ impl TryFrom<&protobuf::ParquetOptions> for 
ParquetOptions {
             max_predicate_cache_size: 
value.max_predicate_cache_size_opt.map(|opt| match opt {
                 
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) 
=> Some(v as usize),
             }).unwrap_or(None),
+            max_row_group_bytes: value.max_row_group_bytes_opt.and_then(|opt| 
match opt {
+                
protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => 
MaxRowGroupBytes::try_new(v as usize).ok(),
+            }),
             content_defined_chunking: 
value.content_defined_chunking.map(ParquetCdcOptions::from).unwrap_or_default(),
         })
     }
@@ -1331,7 +1334,7 @@ pub(crate) fn csv_writer_options_from_proto(
 #[cfg(test)]
 mod tests {
     use datafusion_common::config::{
-        ParquetCdcOptions, ParquetOptions, TableParquetOptions,
+        MaxRowGroupBytes, ParquetCdcOptions, ParquetOptions, 
TableParquetOptions,
     };
 
     fn parquet_options_proto_round_trip(opts: ParquetOptions) -> 
ParquetOptions {
@@ -1376,6 +1379,21 @@ mod tests {
         assert_eq!(recovered.coerce_int96_tz, Some("UTC".to_string()));
     }
 
+    #[test]
+    fn test_parquet_options_max_row_group_bytes_round_trip() {
+        let opts = ParquetOptions {
+            max_row_group_bytes: Some(
+                MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap(),
+            ),
+            ..ParquetOptions::default()
+        };
+        let recovered = parquet_options_proto_round_trip(opts.clone());
+        assert_eq!(
+            recovered.max_row_group_bytes.map(|v| v.get()),
+            Some(64 * 1024 * 1024)
+        );
+    }
+
     #[test]
     fn test_table_parquet_options_coerce_int96_tz_round_trip() {
         let mut opts = TableParquetOptions::default();
diff --git a/datafusion/proto-common/src/generated/pbjson.rs 
b/datafusion/proto-common/src/generated/pbjson.rs
index 83e29929d1..963faa5a3e 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -6448,6 +6448,9 @@ impl serde::Serialize for ParquetOptions {
         if self.max_predicate_cache_size_opt.is_some() {
             len += 1;
         }
+        if self.max_row_group_bytes_opt.is_some() {
+            len += 1;
+        }
         if self.coerce_int96_tz_opt.is_some() {
             len += 1;
         }
@@ -6619,6 +6622,15 @@ impl serde::Serialize for ParquetOptions {
                 }
             }
         }
+        if let Some(v) = self.max_row_group_bytes_opt.as_ref() {
+            match v {
+                parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => {
+                    #[allow(clippy::needless_borrow)]
+                    #[allow(clippy::needless_borrows_for_generic_args)]
+                    struct_ser.serialize_field("maxRowGroupBytes", 
ToString::to_string(&v).as_str())?;
+                }
+            }
+        }
         if let Some(v) = self.coerce_int96_tz_opt.as_ref() {
             match v {
                 parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => {
@@ -6699,6 +6711,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
             "coerceInt96",
             "max_predicate_cache_size",
             "maxPredicateCacheSize",
+            "max_row_group_bytes",
+            "maxRowGroupBytes",
             "coerce_int96_tz",
             "coerceInt96Tz",
         ];
@@ -6738,6 +6752,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
             BloomFilterNdv,
             CoerceInt96,
             MaxPredicateCacheSize,
+            MaxRowGroupBytes,
             CoerceInt96Tz,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
@@ -6793,6 +6808,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
                             "bloomFilterNdv" | "bloom_filter_ndv" => 
Ok(GeneratedField::BloomFilterNdv),
                             "coerceInt96" | "coerce_int96" => 
Ok(GeneratedField::CoerceInt96),
                             "maxPredicateCacheSize" | 
"max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize),
+                            "maxRowGroupBytes" | "max_row_group_bytes" => 
Ok(GeneratedField::MaxRowGroupBytes),
                             "coerceInt96Tz" | "coerce_int96_tz" => 
Ok(GeneratedField::CoerceInt96Tz),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
@@ -6846,6 +6862,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
                 let mut bloom_filter_ndv_opt__ = None;
                 let mut coerce_int96_opt__ = None;
                 let mut max_predicate_cache_size_opt__ = None;
+                let mut max_row_group_bytes_opt__ = None;
                 let mut coerce_int96_tz_opt__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
@@ -7061,6 +7078,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
                             }
                             max_predicate_cache_size_opt__ = 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0));
                         }
+                        GeneratedField::MaxRowGroupBytes => {
+                            if max_row_group_bytes_opt__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("maxRowGroupBytes"));
+                            }
+                            max_row_group_bytes_opt__ = 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(x.0));
+                        }
                         GeneratedField::CoerceInt96Tz => {
                             if coerce_int96_tz_opt__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("coerceInt96Tz"));
@@ -7103,6 +7126,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
                     bloom_filter_ndv_opt: bloom_filter_ndv_opt__,
                     coerce_int96_opt: coerce_int96_opt__,
                     max_predicate_cache_size_opt: 
max_predicate_cache_size_opt__,
+                    max_row_group_bytes_opt: max_row_group_bytes_opt__,
                     coerce_int96_tz_opt: coerce_int96_tz_opt__,
                 })
             }
diff --git a/datafusion/proto-common/src/generated/prost.rs 
b/datafusion/proto-common/src/generated/prost.rs
index ae34f9b264..93b97c4f13 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -900,6 +900,10 @@ pub struct ParquetOptions {
     pub max_predicate_cache_size_opt: ::core::option::Option<
         parquet_options::MaxPredicateCacheSizeOpt,
     >,
+    #[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")]
+    pub max_row_group_bytes_opt: ::core::option::Option<
+        parquet_options::MaxRowGroupBytesOpt,
+    >,
     /// Optional timezone applied to INT96-coerced timestamps when 
`coerce_int96`
     /// is set. When `Some`, INT96 columns coerce to
     /// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
@@ -964,6 +968,11 @@ pub mod parquet_options {
         #[prost(uint64, tag = "33")]
         MaxPredicateCacheSize(u64),
     }
+    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
+    pub enum MaxRowGroupBytesOpt {
+        #[prost(uint64, tag = "37")]
+        MaxRowGroupBytes(u64),
+    }
     /// Optional timezone applied to INT96-coerced timestamps when 
`coerce_int96`
     /// is set. When `Some`, INT96 columns coerce to
     /// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
diff --git a/datafusion/proto-common/src/to_proto/mod.rs 
b/datafusion/proto-common/src/to_proto/mod.rs
index ef675690a9..a6fa13ca74 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -938,6 +938,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
             coerce_int96_opt: 
value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
             coerce_int96_tz_opt: 
value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz),
             max_predicate_cache_size_opt: 
value.max_predicate_cache_size.map(|v| 
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as 
u64)),
+            max_row_group_bytes_opt: value.max_row_group_bytes.map(|v| 
protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v.get() as 
u64)),
             content_defined_chunking: 
Some((&value.content_defined_chunking).into()),
         })
     }
diff --git a/datafusion/proto-models/src/generated/datafusion_proto_common.rs 
b/datafusion/proto-models/src/generated/datafusion_proto_common.rs
index ae34f9b264..93b97c4f13 100644
--- a/datafusion/proto-models/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto-models/src/generated/datafusion_proto_common.rs
@@ -900,6 +900,10 @@ pub struct ParquetOptions {
     pub max_predicate_cache_size_opt: ::core::option::Option<
         parquet_options::MaxPredicateCacheSizeOpt,
     >,
+    #[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")]
+    pub max_row_group_bytes_opt: ::core::option::Option<
+        parquet_options::MaxRowGroupBytesOpt,
+    >,
     /// Optional timezone applied to INT96-coerced timestamps when 
`coerce_int96`
     /// is set. When `Some`, INT96 columns coerce to
     /// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
@@ -964,6 +968,11 @@ pub mod parquet_options {
         #[prost(uint64, tag = "33")]
         MaxPredicateCacheSize(u64),
     }
+    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
+    pub enum MaxRowGroupBytesOpt {
+        #[prost(uint64, tag = "37")]
+        MaxRowGroupBytes(u64),
+    }
     /// Optional timezone applied to INT96-coerced timestamps when 
`coerce_int96`
     /// is set. When `Some`, INT96 columns coerce to
     /// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs 
b/datafusion/proto/src/logical_plan/file_formats.rs
index bb709d3fcc..8e71cc9268 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -383,7 +383,8 @@ mod parquet {
         parquet_options,
     };
     use datafusion_common::config::{
-        ParquetCdcOptions, ParquetColumnOptions, ParquetOptions, 
TableParquetOptions,
+        MaxRowGroupBytes, ParquetCdcOptions, ParquetColumnOptions, 
ParquetOptions,
+        TableParquetOptions,
     };
     use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
 
@@ -455,6 +456,9 @@ mod parquet {
                 max_predicate_cache_size_opt: 
global_options.global.max_predicate_cache_size.map(|size| {
                     
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64)
                 }),
+                max_row_group_bytes_opt: 
global_options.global.max_row_group_bytes.map(|size| {
+                    
parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(size.get() as u64)
+                }),
                 content_defined_chunking: Some(ParquetCdcOptionsProto {
                     enabled: 
global_options.global.content_defined_chunking.enabled,
                     min_chunk_size: 
global_options.global.content_defined_chunking.min_chunk_size as u64,
@@ -628,6 +632,14 @@ mod parquet {
                             size,
                         ) => *size as usize,
                     }),
+                max_row_group_bytes: proto
+                    .max_row_group_bytes_opt
+                    .as_ref()
+                    .and_then(|opt| match opt {
+                        
parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(size) => {
+                            MaxRowGroupBytes::try_new(*size as usize).ok()
+                        }
+                    }),
                 content_defined_chunking: proto
                     .content_defined_chunking
                     .map(ParquetCdcOptions::from_proto)
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 402ac8e851..7aa7269b58 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -326,6 +326,7 @@ OPTIONS (
 'format.compression::col1' 'zstd(5)',
 'format.compression::col2' snappy,
 'format.max_row_group_size' 12345,
+'format.max_row_group_bytes' 2048,
 'format.data_pagesize_limit' 1234,
 'format.write_batch_size' 1234,
 'format.writer_version' 2.0,
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index 991732641c..370492c2eb 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -252,6 +252,7 @@ datafusion.execution.parquet.enable_page_index true
 datafusion.execution.parquet.encoding NULL
 datafusion.execution.parquet.force_filter_selections false
 datafusion.execution.parquet.max_predicate_cache_size NULL
+datafusion.execution.parquet.max_row_group_bytes NULL
 datafusion.execution.parquet.max_row_group_size 1048576
 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2
 datafusion.execution.parquet.maximum_parallel_row_group_writers 1
@@ -407,7 +408,8 @@ datafusion.execution.parquet.enable_page_index true 
(reading) If true, reads the
 datafusion.execution.parquet.encoding NULL (writing)  Sets default encoding 
for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, 
delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, 
and byte_stream_split. These values are not case sensitive. If NULL, uses 
default parquet writer setting
 datafusion.execution.parquet.force_filter_selections false (reading) Force the 
use of RowSelections for filter results, when pushdown_filters is enabled. If 
false, the reader will automatically choose between a RowSelection and a Bitmap 
based on the number and pattern of selected rows.
 datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The 
maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, 
sets the maximum memory used to cache the results of predicate evaluation 
between filter evaluation and output generation. Decreasing this value will 
reduce memory usage, but may increase IO and CPU usage. None means use the 
default parquet reader setting. 0 means no caching.
-datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target 
maximum number of rows in each row group (defaults to 1M rows). Writing larger 
row groups requires more memory to write, but can get better compression and be 
faster to read.
+datafusion.execution.parquet.max_row_group_bytes NULL (writing) Target maximum 
size of each row group in bytes. When set, the writer flushes whenever either 
this limit or `max_row_group_size` is reached, whichever comes first. Useful 
for bounding writer memory on wide schemas where a row-count limit can map to 
very different byte sizes. Matches the behavior of `parquet.block.size` in 
parquet-mr. If `None` (the default), only the row-count limit applies. 
Currently only honored when `allow [...]
+datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target 
maximum number of rows in each row group (defaults to 1M rows). Writing larger 
row groups requires more memory to write, but can get better compression and be 
faster to read. When `max_row_group_bytes` is also set, the writer flushes a 
row group when either limit is reached, whichever comes first.
 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 
(writing) By default parallel parquet writer is tuned for minimum memory usage 
in a streaming execution plan. You may see a performance benefit when writing 
large parquet files by increasing maximum_parallel_row_group_writers and 
maximum_buffered_record_batches_per_stream if your system has idle cores and 
can tolerate additional memory usage. Boosting these values is likely 
worthwhile when writing out already in-me [...]
 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By 
default parallel parquet writer is tuned for minimum memory usage in a 
streaming execution plan. You may see a performance benefit when writing large 
parquet files by increasing maximum_parallel_row_group_writers and 
maximum_buffered_record_batches_per_stream if your system has idle cores and 
can tolerate additional memory usage. Boosting these values is likely 
worthwhile when writing out already in-memory dat [...]
 datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, 
the parquet reader will try and fetch the last `size_hint` bytes of the parquet 
file optimistically. If not specified, two reads are required: One read to 
fetch the 8-byte parquet footer and another to fetch the metadata length 
encoded in the footer Default setting to 512 KiB, which should be sufficient 
for most parquet files, it can reduce one I/O operation per parquet file. If 
the metadata is larger than the [...]
diff --git a/datafusion/sqllogictest/test_files/parquet_max_row_group_bytes.slt 
b/datafusion/sqllogictest/test_files/parquet_max_row_group_bytes.slt
new file mode 100644
index 0000000000..8de83329ae
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/parquet_max_row_group_bytes.slt
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# End-to-end tests for the `max_row_group_bytes` Parquet writer option:
+# write Parquet files with the option set, then read them back to confirm
+# the option is wired through from config to the writer.
+# See datafusion/common/src/config.rs for the option definition.
+
+statement ok
+CREATE TABLE source_table(id INT, name VARCHAR) AS VALUES
+(1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five');
+
+# Write with max_row_group_bytes set via COPY format options.
+query I
+COPY source_table
+TO 'test_files/scratch/parquet_max_row_group_bytes/copy_options/'
+STORED AS PARQUET
+OPTIONS ('format.max_row_group_bytes' 1024);
+----
+5
+
+statement ok
+CREATE EXTERNAL TABLE readback_copy_options
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_max_row_group_bytes/copy_options/';
+
+query IT
+SELECT id, name FROM readback_copy_options ORDER BY id;
+----
+1 one
+2 two
+3 three
+4 four
+5 five
+
+# The option also applies when set via the session config (not just COPY 
OPTIONS).
+statement ok
+SET datafusion.execution.parquet.max_row_group_bytes = 2048;
+
+query I
+COPY source_table
+TO 'test_files/scratch/parquet_max_row_group_bytes/session_config/'
+STORED AS PARQUET;
+----
+5
+
+statement ok
+RESET datafusion.execution.parquet.max_row_group_bytes;
+
+statement ok
+CREATE EXTERNAL TABLE readback_session_config
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_max_row_group_bytes/session_config/';
+
+query IT
+SELECT id, name FROM readback_session_config ORDER BY id;
+----
+1 one
+2 two
+3 three
+4 four
+5 five
+
+# A zero byte limit is rejected with a clear configuration error.
+query error DataFusion error: Invalid or Unsupported Configuration: 
max_row_group_bytes must be greater than 0
+COPY source_table
+TO 'test_files/scratch/parquet_max_row_group_bytes/invalid/'
+STORED AS PARQUET
+OPTIONS ('format.max_row_group_bytes' 0);
+
+# -----------------------------------------------------------------------------
+# Row-group-count verification via EXPLAIN ANALYZE.
+#
+# `row_groups_pruned_statistics=N total` reports the number of row groups in 
the
+# written file, so it lets us confirm that `max_row_group_bytes` actually
+# changes how the writer splits row groups, and that combining it with
+# `max_row_group_size` flushes on whichever limit is reached first.
+#
+# NOTE: byte-based flushing is currently honored only by the single-threaded
+# Parquet writer (`AsyncArrowWriter`/`ArrowWriter`), which encodes inline and
+# can therefore observe the in-progress row group's encoded size. The
+# multi-threaded (parallel) writer decides row-group boundaries by row count
+# only and ignores `max_row_group_bytes`, so these cases force the
+# single-threaded path with `allow_single_file_parallelism = false`. Extending
+# the parallel writer to honor the byte limit is a follow-up change.
+# -----------------------------------------------------------------------------
+
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.execution.minimum_parallel_output_files = 1;
+
+statement ok
+set datafusion.execution.batch_size = 1024;
+
+statement ok
+set datafusion.execution.parquet.allow_single_file_parallelism = false;
+
+# Row-count limit only: 4096 rows with max_row_group_size = 1000 -> 5 row 
groups
+# (four full groups of 1000 plus a remainder of 96).
+statement ok
+COPY (SELECT value AS id FROM range(0, 4096))
+TO 'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_only/'
+STORED AS PARQUET
+OPTIONS ('format.max_row_group_size' 1000);
+
+statement ok
+CREATE EXTERNAL TABLE rg_count_size_only
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_only/';
+
+query TT
+EXPLAIN ANALYZE SELECT * FROM rg_count_size_only WHERE id >= 0;
+----
+Plan with Metrics
+01)FilterExec: id@0 >= 0<slt:ignore>
+02)--DataSourceExec: <slt:ignore>row_groups_pruned_statistics=5 
total<slt:ignore>
+
+# Both limits set: the byte limit also flushes the sub-1000-row remainders that
+# the row-count limit would otherwise carry into the next batch, so the file is
+# split more finely -> 8 row groups (whichever limit is reached first).
+statement ok
+COPY (SELECT value AS id FROM range(0, 4096))
+TO 'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_and_bytes/'
+STORED AS PARQUET
+OPTIONS ('format.max_row_group_size' 1000, 'format.max_row_group_bytes' 1);
+
+statement ok
+CREATE EXTERNAL TABLE rg_count_size_and_bytes
+STORED AS PARQUET
+LOCATION 
'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_and_bytes/';
+
+query TT
+EXPLAIN ANALYZE SELECT * FROM rg_count_size_and_bytes WHERE id >= 0;
+----
+Plan with Metrics
+01)FilterExec: id@0 >= 0<slt:ignore>
+02)--DataSourceExec: <slt:ignore>row_groups_pruned_statistics=8 
total<slt:ignore>
+
+# Byte limit drives alone: the row-count limit is far larger than the data, so
+# only the byte limit splits. Each 1024-row batch fills a fresh (empty) row
+# group, which is never split mid-batch -> 4 row groups.
+statement ok
+COPY (SELECT value AS id FROM range(0, 4096))
+TO 'test_files/scratch/parquet_max_row_group_bytes/rg_count_bytes_only/'
+STORED AS PARQUET
+OPTIONS ('format.max_row_group_size' 100000, 'format.max_row_group_bytes' 1);
+
+statement ok
+CREATE EXTERNAL TABLE rg_count_bytes_only
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_max_row_group_bytes/rg_count_bytes_only/';
+
+query TT
+EXPLAIN ANALYZE SELECT * FROM rg_count_bytes_only WHERE id >= 0;
+----
+Plan with Metrics
+01)FilterExec: id@0 >= 0<slt:ignore>
+02)--DataSourceExec: <slt:ignore>row_groups_pruned_statistics=4 
total<slt:ignore>
+
+statement ok
+reset datafusion.execution.parquet.allow_single_file_parallelism;
+
+statement ok
+reset datafusion.execution.batch_size;
+
+statement ok
+reset datafusion.execution.minimum_parallel_output_files;
+
+statement ok
+set datafusion.execution.target_partitions = 4;
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 88fbeb3de0..442b72ea9b 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -101,7 +101,8 @@ The following configuration settings are available:
 | datafusion.execution.parquet.dictionary_enabled                         | 
true                      | (writing) Sets if dictionary encoding is enabled. 
If NULL, uses default parquet writer setting                                    
                                                                                
                                                                                
                                                                                
                   [...]
 | datafusion.execution.parquet.dictionary_page_size_limit                 | 
1048576                   | (writing) Sets best effort maximum dictionary page 
size, in bytes                                                                  
                                                                                
                                                                                
                                                                                
                  [...]
 | datafusion.execution.parquet.statistics_enabled                         | 
page                      | (writing) Sets if statistics are enabled for any 
column Valid values are: "none", "chunk", and "page" These values are not case 
sensitive. If NULL, uses default parquet writer setting                         
                                                                                
                                                                                
                     [...]
-| datafusion.execution.parquet.max_row_group_size                         | 
1048576                   | (writing) Target maximum number of rows in each row 
group (defaults to 1M rows). Writing larger row groups requires more memory to 
write, but can get better compression and be faster to read.                    
                                                                                
                                                                                
                  [...]
+| datafusion.execution.parquet.max_row_group_size                         | 
1048576                   | (writing) Target maximum number of rows in each row 
group (defaults to 1M rows). Writing larger row groups requires more memory to 
write, but can get better compression and be faster to read. When 
`max_row_group_bytes` is also set, the writer flushes a row group when either 
limit is reached, whichever comes first.                                        
                                  [...]
+| datafusion.execution.parquet.max_row_group_bytes                        | 
NULL                      | (writing) Target maximum size of each row group in 
bytes. When set, the writer flushes whenever either this limit or 
`max_row_group_size` is reached, whichever comes first. Useful for bounding 
writer memory on wide schemas where a row-count limit can map to very different 
byte sizes. Matches the behavior of `parquet.block.size` in parquet-mr. If 
`None` (the default), only the row-count [...]
 | datafusion.execution.parquet.created_by                                 | 
datafusion version 53.1.0 | (writing) Sets "created by" property                
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
 | datafusion.execution.parquet.column_index_truncate_length               | 64 
                       | (writing) Sets column index truncate length            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | datafusion.execution.parquet.statistics_truncate_length                 | 64 
                       | (writing) Sets statistics truncate length. If NULL, 
uses default parquet writer setting                                             
                                                                                
                                                                                
                                                                                
                 [...]
diff --git a/docs/source/user-guide/sql/format_options.md 
b/docs/source/user-guide/sql/format_options.md
index 46d251c18e..ca79858dae 100644
--- a/docs/source/user-guide/sql/format_options.md
+++ b/docs/source/user-guide/sql/format_options.md
@@ -142,6 +142,7 @@ The following options are available when reading or writing 
Parquet files. If an
 | BLOOM_FILTER_FPP                           | Yes                     | Sets 
bloom filter false positive probability (global or per column).                 
                                                                                
                                                                                
                                                                                
| `'bloom_filter_fpp'` or `'bloom_filter_fpp::col'`     | None                  
   |
 | BLOOM_FILTER_NDV                           | Yes                     | Sets 
bloom filter number of distinct values (global or per column).                  
                                                                                
                                                                                
                                                                                
| `'bloom_filter_ndv'` or `'bloom_filter_ndv::col'`     | None                  
   |
 | MAX_ROW_GROUP_SIZE                         | No                      | Sets 
the maximum number of rows per row group. Larger groups require more memory but 
can improve compression and scan efficiency.                                    
                                                                                
                                                                                
| `'max_row_group_size'`                                | 1048576               
   |
+| MAX_ROW_GROUP_BYTES                        | No                      | Sets 
the maximum size of each row group in bytes. When both this and 
`MAX_ROW_GROUP_SIZE` are set, the row group flushes whenever either limit is 
reached. Mirrors `parquet.block.size` from parquet-mr. Currently only honored 
when `allow_single_file_parallelism` is `false`; by default the parallel file 
writer ignores it.     | `'max_row_group_bytes'`                               
| None                     |
 | ENABLE_PAGE_INDEX                          | No                      | If 
true, reads the Parquet data page level metadata (the Page Index), if present, 
to reduce I/O and decoding.                                                     
                                                                                
                                                                                
   | `'enable_page_index'`                                 | true               
      |
 | PRUNING                                    | No                      | If 
true, enables row group pruning based on min/max statistics.                    
                                                                                
                                                                                
                                                                                
  | `'pruning'`                                           | true                
     |
 | SKIP_METADATA                              | No                      | If 
true, skips optional embedded metadata in the file schema.                      
                                                                                
                                                                                
                                                                                
  | `'skip_metadata'`                                     | true                
     |


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to