This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new cfb21e62 feat(table): configure Parquet DataPage version (#812)
cfb21e62 is described below

commit cfb21e62d8f0017b23f0178e560c5c515c2239d3
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Thu Mar 26 23:03:58 2026 +0100

    feat(table): configure Parquet DataPage version (#812)
    
    Add write.parquet.page-version property ("1" or "2", default "2").
    Resolves compatibility with consumers that don't support DataPage V2
    (e.g. Snowflake). Default keeps current V2 behavior for backward compat.
    
    Fix: https://github.com/apache/iceberg-go/issues/800
---
 table/internal/parquet_files.go      | 18 +++++++++-
 table/internal/parquet_files_test.go | 68 ++++++++++++++++++++++++++++++++++++
 table/properties.go                  |  2 ++
 3 files changed, 87 insertions(+), 1 deletion(-)

diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 1a80fd57..a78c2d34 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "log/slog"
        "maps"
        "slices"
        "strconv"
@@ -55,6 +56,8 @@ const (
        ParquetPageRowLimitDefault               = 20000
        ParquetDictSizeBytesKey                  = 
"write.parquet.dict-size-bytes"
        ParquetDictSizeBytesDefault              = 2 * 1024 * 1024 // 2 MB
+       ParquetPageVersionKey                    = "write.parquet.page-version"
+       ParquetPageVersionDefault                = "2"
        ParquetCompressionKey                    = 
"write.parquet.compression-codec"
        ParquetCompressionDefault                = "zstd"
        ParquetCompressionLevelKey               = 
"write.parquet.compression-level"
@@ -201,13 +204,26 @@ func (parquetFormat) PrimitiveTypeToPhysicalType(typ 
iceberg.PrimitiveType) stri
 }
 
 func (parquetFormat) GetWriteProperties(props iceberg.Properties) any {
+       pageVersion := props.Get(ParquetPageVersionKey, 
ParquetPageVersionDefault)
+
+       var dpVersion parquet.DataPageVersion
+       switch pageVersion {
+       case "1":
+               dpVersion = parquet.DataPageV1
+       case "2":
+               dpVersion = parquet.DataPageV2
+       default:
+               slog.Warn("unrecognized data page version, falling back to v2", 
"version", pageVersion)
+               dpVersion = parquet.DataPageV2
+       }
+
        writerProps := []parquet.WriterProperty{
                parquet.WithDictionaryDefault(false),
                
parquet.WithMaxRowGroupLength(int64(props.GetInt(ParquetRowGroupLimitKey,
                        ParquetRowGroupLimitDefault))),
                
parquet.WithDataPageSize(int64(props.GetInt(ParquetPageSizeBytesKey,
                        ParquetPageSizeBytesDefault))),
-               parquet.WithDataPageVersion(parquet.DataPageV2),
+               parquet.WithDataPageVersion(dpVersion),
                parquet.WithBatchSize(int64(props.GetInt(ParquetPageRowLimitKey,
                        ParquetPageRowLimitDefault))),
                
parquet.WithDictionaryPageSizeLimit(int64(props.GetInt(ParquetDictSizeBytesKey,
diff --git a/table/internal/parquet_files_test.go 
b/table/internal/parquet_files_test.go
index 42adc22c..744b0f3b 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -528,3 +528,71 @@ func TestWriteDataFileErrOnClose(t *testing.T) {
        }, []arrow.RecordBatch{rec})
        require.ErrorContains(t, err, "error on close")
 }
+
+func TestGetWritePropertiesPageVersion(t *testing.T) {
+       tests := []struct {
+               name             string
+               props            iceberg.Properties
+               expectedPageType file.PageType
+       }{
+               {
+                       name:             "default is v2",
+                       props:            iceberg.Properties{},
+                       expectedPageType: file.PageTypeDataPageV2,
+               },
+               {
+                       name:             "explicit v2",
+                       props:            
iceberg.Properties{internal.ParquetPageVersionKey: "2"},
+                       expectedPageType: file.PageTypeDataPageV2,
+               },
+               {
+                       name:             "explicit v1",
+                       props:            
iceberg.Properties{internal.ParquetPageVersionKey: "1"},
+                       expectedPageType: file.PageTypeDataPage,
+               },
+               {
+                       name:             "invalid falls back to v2",
+                       props:            
iceberg.Properties{internal.ParquetPageVersionKey: "invalid"},
+                       expectedPageType: file.PageTypeDataPageV2,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       mem := 
memory.NewCheckedAllocator(memory.DefaultAllocator)
+                       defer mem.AssertSize(t, 0)
+
+                       format := internal.GetFileFormat(iceberg.ParquetFile)
+                       writeProps := 
format.GetWriteProperties(tt.props).([]parquet.WriterProperty)
+
+                       root, err := schema.NewGroupNode("schema", 
parquet.Repetitions.Required, schema.FieldList{
+                               schema.NewInt32Node("col", 
parquet.Repetitions.Required, -1),
+                       }, -1)
+                       require.NoError(t, err)
+
+                       var buf bytes.Buffer
+                       pw := file.NewParquetWriter(&buf, root, 
file.WithWriterProps(
+                               parquet.NewWriterProperties(writeProps...),
+                       ))
+
+                       rgw := pw.AppendRowGroup()
+                       cw, _ := rgw.NextColumn()
+                       cw.(*file.Int32ColumnChunkWriter).WriteBatch(
+                               []int32{1, 2, 3}, nil, nil,
+                       )
+                       cw.Close()
+                       rgw.Close()
+                       pw.Close()
+
+                       rdr, err := 
file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+                       require.NoError(t, err)
+                       defer rdr.Close()
+
+                       pageRdr, err := rdr.RowGroup(0).GetColumnPageReader(0)
+                       require.NoError(t, err)
+
+                       require.True(t, pageRdr.Next())
+                       assert.Equal(t, tt.expectedPageType, 
pageRdr.Page().Type())
+               })
+       }
+}
diff --git a/table/properties.go b/table/properties.go
index 63c91761..ae9ed093 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -47,6 +47,8 @@ const (
        ParquetPageRowLimitDefault               = 
internal.ParquetPageRowLimitDefault
        ParquetDictSizeBytesKey                  = 
internal.ParquetDictSizeBytesKey
        ParquetDictSizeBytesDefault              = 
internal.ParquetDictSizeBytesDefault
+       ParquetPageVersionKey                    = 
internal.ParquetPageVersionKey
+       ParquetPageVersionDefault                = 
internal.ParquetPageVersionDefault
        ParquetCompressionKey                    = 
internal.ParquetCompressionKey
        ParquetCompressionDefault                = 
internal.ParquetCompressionDefault
        ParquetCompressionLevelKey               = 
internal.ParquetCompressionLevelKey

Reply via email to