This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new cfb21e62 feat(table): configure Parquet DataPage version (#812)
cfb21e62 is described below
commit cfb21e62d8f0017b23f0178e560c5c515c2239d3
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Thu Mar 26 23:03:58 2026 +0100
feat(table): configure Parquet DataPage version (#812)
Add write.parquet.page-version property ("1" or "2", default "2").
Resolves compatibility with consumers that don't support DataPage V2
(e.g. Snowflake). Default keeps current V2 behavior for backward compat.
Fix: https://github.com/apache/iceberg-go/issues/800
---
table/internal/parquet_files.go | 18 +++++++++-
table/internal/parquet_files_test.go | 68 ++++++++++++++++++++++++++++++++++++
table/properties.go | 2 ++
3 files changed, 87 insertions(+), 1 deletion(-)
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 1a80fd57..a78c2d34 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
+ "log/slog"
"maps"
"slices"
"strconv"
@@ -55,6 +56,8 @@ const (
ParquetPageRowLimitDefault = 20000
ParquetDictSizeBytesKey =
"write.parquet.dict-size-bytes"
ParquetDictSizeBytesDefault = 2 * 1024 * 1024 // 2 MB
+ ParquetPageVersionKey = "write.parquet.page-version"
+ ParquetPageVersionDefault = "2"
ParquetCompressionKey =
"write.parquet.compression-codec"
ParquetCompressionDefault = "zstd"
ParquetCompressionLevelKey =
"write.parquet.compression-level"
@@ -201,13 +204,26 @@ func (parquetFormat) PrimitiveTypeToPhysicalType(typ
iceberg.PrimitiveType) stri
}
func (parquetFormat) GetWriteProperties(props iceberg.Properties) any {
+ pageVersion := props.Get(ParquetPageVersionKey,
ParquetPageVersionDefault)
+
+ var dpVersion parquet.DataPageVersion
+ switch pageVersion {
+ case "1":
+ dpVersion = parquet.DataPageV1
+ case "2":
+ dpVersion = parquet.DataPageV2
+ default:
+ slog.Warn("unrecognized data page version, falling back to v2",
"version", pageVersion)
+ dpVersion = parquet.DataPageV2
+ }
+
writerProps := []parquet.WriterProperty{
parquet.WithDictionaryDefault(false),
parquet.WithMaxRowGroupLength(int64(props.GetInt(ParquetRowGroupLimitKey,
ParquetRowGroupLimitDefault))),
parquet.WithDataPageSize(int64(props.GetInt(ParquetPageSizeBytesKey,
ParquetPageSizeBytesDefault))),
- parquet.WithDataPageVersion(parquet.DataPageV2),
+ parquet.WithDataPageVersion(dpVersion),
parquet.WithBatchSize(int64(props.GetInt(ParquetPageRowLimitKey,
ParquetPageRowLimitDefault))),
parquet.WithDictionaryPageSizeLimit(int64(props.GetInt(ParquetDictSizeBytesKey,
diff --git a/table/internal/parquet_files_test.go
b/table/internal/parquet_files_test.go
index 42adc22c..744b0f3b 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -528,3 +528,71 @@ func TestWriteDataFileErrOnClose(t *testing.T) {
}, []arrow.RecordBatch{rec})
require.ErrorContains(t, err, "error on close")
}
+
+func TestGetWritePropertiesPageVersion(t *testing.T) {
+ tests := []struct {
+ name string
+ props iceberg.Properties
+ expectedPageType file.PageType
+ }{
+ {
+ name: "default is v2",
+ props: iceberg.Properties{},
+ expectedPageType: file.PageTypeDataPageV2,
+ },
+ {
+ name: "explicit v2",
+ props:
iceberg.Properties{internal.ParquetPageVersionKey: "2"},
+ expectedPageType: file.PageTypeDataPageV2,
+ },
+ {
+ name: "explicit v1",
+ props:
iceberg.Properties{internal.ParquetPageVersionKey: "1"},
+ expectedPageType: file.PageTypeDataPage,
+ },
+ {
+ name: "invalid falls back to v2",
+ props:
iceberg.Properties{internal.ParquetPageVersionKey: "invalid"},
+ expectedPageType: file.PageTypeDataPageV2,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ format := internal.GetFileFormat(iceberg.ParquetFile)
+ writeProps :=
format.GetWriteProperties(tt.props).([]parquet.WriterProperty)
+
+ root, err := schema.NewGroupNode("schema",
parquet.Repetitions.Required, schema.FieldList{
+ schema.NewInt32Node("col",
parquet.Repetitions.Required, -1),
+ }, -1)
+ require.NoError(t, err)
+
+ var buf bytes.Buffer
+ pw := file.NewParquetWriter(&buf, root,
file.WithWriterProps(
+ parquet.NewWriterProperties(writeProps...),
+ ))
+
+ rgw := pw.AppendRowGroup()
+ cw, _ := rgw.NextColumn()
+ cw.(*file.Int32ColumnChunkWriter).WriteBatch(
+ []int32{1, 2, 3}, nil, nil,
+ )
+ cw.Close()
+ rgw.Close()
+ pw.Close()
+
+ rdr, err :=
file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ pageRdr, err := rdr.RowGroup(0).GetColumnPageReader(0)
+ require.NoError(t, err)
+
+ require.True(t, pageRdr.Next())
+ assert.Equal(t, tt.expectedPageType,
pageRdr.Page().Type())
+ })
+ }
+}
diff --git a/table/properties.go b/table/properties.go
index 63c91761..ae9ed093 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -47,6 +47,8 @@ const (
ParquetPageRowLimitDefault =
internal.ParquetPageRowLimitDefault
ParquetDictSizeBytesKey =
internal.ParquetDictSizeBytesKey
ParquetDictSizeBytesDefault =
internal.ParquetDictSizeBytesDefault
+ ParquetPageVersionKey =
internal.ParquetPageVersionKey
+ ParquetPageVersionDefault =
internal.ParquetPageVersionDefault
ParquetCompressionKey =
internal.ParquetCompressionKey
ParquetCompressionDefault =
internal.ParquetCompressionDefault
ParquetCompressionLevelKey =
internal.ParquetCompressionLevelKey